diff --git a/go.mod b/go.mod index 2e18c09..bcb281c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/sunmi-OS/gocore/v2 -go 1.16 +go 1.18 require ( github.com/BurntSushi/toml v1.3.2 diff --git a/lib/tracing/client/otel/exporter/zipkin/env.go b/lib/tracing/client/otel/exporter/zipkin/env.go new file mode 100644 index 0000000..f1afd8e --- /dev/null +++ b/lib/tracing/client/otel/exporter/zipkin/env.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin // import "go.opentelemetry.io/otel/exporters/zipkin" + +import "os" + +// Environment variable names. +const ( + // Endpoint for Zipkin collector. + envEndpoint = "OTEL_EXPORTER_ZIPKIN_ENDPOINT" +) + +// envOr returns an env variable's value if it is exists or the default if not. +func envOr(key, defaultValue string) string { + if v := os.Getenv(key); v != "" { + return v + } + return defaultValue +} diff --git a/lib/tracing/client/otel/exporter/zipkin/model.go b/lib/tracing/client/otel/exporter/zipkin/model.go new file mode 100644 index 0000000..1ac80c1 --- /dev/null +++ b/lib/tracing/client/otel/exporter/zipkin/model.go @@ -0,0 +1,311 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin // import "go.opentelemetry.io/otel/exporters/zipkin" + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "net" + "strconv" + "strings" + + zkmodel "github.com/openzipkin/zipkin-go/model" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "go.opentelemetry.io/otel/trace" +) + +const ( + keyInstrumentationLibraryName = "otel.library.name" + keyInstrumentationLibraryVersion = "otel.library.version" + + keyPeerHostname attribute.Key = "peer.hostname" + keyPeerAddress attribute.Key = "peer.address" +) + +var defaultServiceName string + +func init() { + // fetch service.name from default resource for backup + defaultResource := resource.Default() + if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists { + defaultServiceName = value.AsString() + } +} + +// SpanModels converts OpenTelemetry spans into Zipkin model spans. +// This is used for exporting to Zipkin compatible tracing services. +func SpanModels(batch []tracesdk.ReadOnlySpan) []zkmodel.SpanModel { + models := make([]zkmodel.SpanModel, 0, len(batch)) + for _, data := range batch { + models = append(models, toZipkinSpanModel(data)) + } + return models +} + +func getServiceName(attrs []attribute.KeyValue) string { + for _, kv := range attrs { + if kv.Key == semconv.ServiceNameKey { + return kv.Value.AsString() + } + } + + return defaultServiceName +} + +func toZipkinSpanModel(data tracesdk.ReadOnlySpan) zkmodel.SpanModel { + return zkmodel.SpanModel{ + SpanContext: toZipkinSpanContext(data), + Name: data.Name(), + Kind: toZipkinKind(data.SpanKind()), + Timestamp: data.StartTime(), + Duration: data.EndTime().Sub(data.StartTime()), + Shared: false, + LocalEndpoint: &zkmodel.Endpoint{ + ServiceName: getServiceName(data.Resource().Attributes()), + }, + RemoteEndpoint: toZipkinRemoteEndpoint(data), + Annotations: toZipkinAnnotations(data.Events()), + Tags: toZipkinTags(data), + } +} + +func toZipkinSpanContext(data tracesdk.ReadOnlySpan) zkmodel.SpanContext { + return zkmodel.SpanContext{ + TraceID: toZipkinTraceID(data.SpanContext().TraceID()), + ID: toZipkinID(data.SpanContext().SpanID()), + ParentID: toZipkinParentID(data.Parent().SpanID()), + Debug: false, + Sampled: nil, + Err: nil, + } +} + +func toZipkinTraceID(traceID trace.TraceID) zkmodel.TraceID { + return zkmodel.TraceID{ + High: binary.BigEndian.Uint64(traceID[:8]), + Low: binary.BigEndian.Uint64(traceID[8:]), + } +} + +func toZipkinID(spanID trace.SpanID) zkmodel.ID { + return zkmodel.ID(binary.BigEndian.Uint64(spanID[:])) +} + +func toZipkinParentID(spanID trace.SpanID) *zkmodel.ID { + if spanID.IsValid() { + id := toZipkinID(spanID) + return &id + } + return nil +} + +func toZipkinKind(kind trace.SpanKind) zkmodel.Kind { + switch kind { + case trace.SpanKindUnspecified: + return zkmodel.Undetermined + case trace.SpanKindInternal: + // The spec says we should set the kind to nil, but + // the model does not allow that. + return zkmodel.Undetermined + case trace.SpanKindServer: + return zkmodel.Server + case trace.SpanKindClient: + return zkmodel.Client + case trace.SpanKindProducer: + return zkmodel.Producer + case trace.SpanKindConsumer: + return zkmodel.Consumer + } + return zkmodel.Undetermined +} + +func toZipkinAnnotations(events []tracesdk.Event) []zkmodel.Annotation { + if len(events) == 0 { + return nil + } + annotations := make([]zkmodel.Annotation, 0, len(events)) + for _, event := range events { + value := event.Name + if len(event.Attributes) > 0 { + jsonString := attributesToJSONMapString(event.Attributes) + if jsonString != "" { + value = fmt.Sprintf("%s: %s", event.Name, jsonString) + } + } + annotations = append(annotations, zkmodel.Annotation{ + Timestamp: event.Time, + Value: value, + }) + } + return annotations +} + +func attributesToJSONMapString(attributes []attribute.KeyValue) string { + m := make(map[string]interface{}, len(attributes)) + for _, a := range attributes { + m[(string)(a.Key)] = a.Value.AsInterface() + } + // if an error happens, the result will be an empty string + jsonBytes, _ := json.Marshal(m) + return (string)(jsonBytes) +} + +// attributeToStringPair serializes each attribute to a string pair. +func attributeToStringPair(kv attribute.KeyValue) (string, string) { + switch kv.Value.Type() { + // For slice attributes, serialize as JSON list string. + case attribute.BOOLSLICE: + data, _ := json.Marshal(kv.Value.AsBoolSlice()) + return (string)(kv.Key), (string)(data) + case attribute.INT64SLICE: + data, _ := json.Marshal(kv.Value.AsInt64Slice()) + return (string)(kv.Key), (string)(data) + case attribute.FLOAT64SLICE: + data, _ := json.Marshal(kv.Value.AsFloat64Slice()) + return (string)(kv.Key), (string)(data) + case attribute.STRINGSLICE: + data, _ := json.Marshal(kv.Value.AsStringSlice()) + return (string)(kv.Key), (string)(data) + default: + return (string)(kv.Key), kv.Value.Emit() + } +} + +// extraZipkinTags are those that may be added to every outgoing span. +var extraZipkinTags = []string{ + "otel.status_code", + keyInstrumentationLibraryName, + keyInstrumentationLibraryVersion, +} + +func toZipkinTags(data tracesdk.ReadOnlySpan) map[string]string { + attr := data.Attributes() + resourceAttr := data.Resource().Attributes() + m := make(map[string]string, len(attr)+len(resourceAttr)+len(extraZipkinTags)) + for _, kv := range attr { + k, v := attributeToStringPair(kv) + m[k] = v + } + for _, kv := range resourceAttr { + k, v := attributeToStringPair(kv) + m[k] = v + } + + if data.Status().Code != codes.Unset { + // Zipkin expect to receive uppercase status values + // rather than default capitalized ones. + m["otel.status_code"] = strings.ToUpper(data.Status().Code.String()) + } + + if data.Status().Code == codes.Error { + m["error"] = data.Status().Description + } else { + delete(m, "error") + } + + if is := data.InstrumentationScope(); is.Name != "" { + m[keyInstrumentationLibraryName] = is.Name + if is.Version != "" { + m[keyInstrumentationLibraryVersion] = is.Version + } + } + + if len(m) == 0 { + return nil + } + + return m +} + +// Rank determines selection order for remote endpoint. See the specification +// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/trace/sdk_exporters/zipkin.md#otlp---zipkin +var remoteEndpointKeyRank = map[attribute.Key]int{ + semconv.PeerServiceKey: 0, + semconv.NetPeerNameKey: 1, + semconv.NetSockPeerNameKey: 2, + semconv.NetSockPeerAddrKey: 3, + keyPeerHostname: 4, + keyPeerAddress: 5, + semconv.DBNameKey: 6, +} + +func toZipkinRemoteEndpoint(data tracesdk.ReadOnlySpan) *zkmodel.Endpoint { + // Should be set only for client or producer kind + if sk := data.SpanKind(); sk != trace.SpanKindClient && sk != trace.SpanKindProducer { + return nil + } + + attr := data.Attributes() + var endpointAttr attribute.KeyValue + for _, kv := range attr { + rank, ok := remoteEndpointKeyRank[kv.Key] + if !ok { + continue + } + + currentKeyRank, ok := remoteEndpointKeyRank[endpointAttr.Key] + if ok && rank < currentKeyRank { + endpointAttr = kv + } else if !ok { + endpointAttr = kv + } + } + + if endpointAttr.Key == "" { + return nil + } + + if endpointAttr.Key != semconv.NetSockPeerAddrKey && + endpointAttr.Value.Type() == attribute.STRING { + return &zkmodel.Endpoint{ + ServiceName: endpointAttr.Value.AsString(), + } + } + + return remoteEndpointPeerIPWithPort(endpointAttr.Value.AsString(), attr) +} + +// Handles `net.peer.ip` remote endpoint separately (should include `net.peer.ip` +// as well, if available). +func remoteEndpointPeerIPWithPort(peerIP string, attrs []attribute.KeyValue) *zkmodel.Endpoint { + ip := net.ParseIP(peerIP) + if ip == nil { + return nil + } + + endpoint := &zkmodel.Endpoint{} + // Determine if IPv4 or IPv6 + if ip.To4() != nil { + endpoint.IPv4 = ip + } else { + endpoint.IPv6 = ip + } + + for _, kv := range attrs { + if kv.Key == semconv.NetSockPeerPortKey { + port, _ := strconv.ParseUint(kv.Value.Emit(), 10, 16) + endpoint.Port = uint16(port) + return endpoint + } + } + + return endpoint +} diff --git a/lib/tracing/client/otel/exporter/zipkin/zipkin.go b/lib/tracing/client/otel/exporter/zipkin/zipkin.go new file mode 100644 index 0000000..e2f2c34 --- /dev/null +++ b/lib/tracing/client/otel/exporter/zipkin/zipkin.go @@ -0,0 +1,206 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin // import "go.opentelemetry.io/otel/exporters/zipkin" + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "sync" + + "github.com/go-logr/logr" + "github.com/go-logr/stdr" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +const ( + defaultCollectorURL = "http://localhost:9411/api/v2/spans" +) + +// Exporter exports spans to the zipkin collector. +type Exporter struct { + url string + client *http.Client + logger logr.Logger + + stoppedMu sync.RWMutex + stopped bool +} + +var _ sdktrace.SpanExporter = &Exporter{} + +var emptyLogger = logr.Logger{} + +// Options contains configuration for the exporter. +type config struct { + client *http.Client + logger logr.Logger +} + +// Option defines a function that configures the exporter. +type Option interface { + apply(config) config +} + +type optionFunc func(config) config + +func (fn optionFunc) apply(cfg config) config { + return fn(cfg) +} + +// WithLogger configures the exporter to use the passed logger. +// WithLogger and WithLogr will overwrite each other. +func WithLogger(logger *log.Logger) Option { + return WithLogr(stdr.New(logger)) +} + +// WithLogr configures the exporter to use the passed logr.Logger. +// WithLogr and WithLogger will overwrite each other. +func WithLogr(logger logr.Logger) Option { + return optionFunc(func(cfg config) config { + cfg.logger = logger + return cfg + }) +} + +// WithClient configures the exporter to use the passed HTTP client. +func WithClient(client *http.Client) Option { + return optionFunc(func(cfg config) config { + cfg.client = client + return cfg + }) +} + +// New creates a new Zipkin exporter. +func New(collectorURL string, opts ...Option) (*Exporter, error) { + if collectorURL == "" { + // Use endpoint from env var or default collector URL. + collectorURL = envOr(envEndpoint, defaultCollectorURL) + } + u, err := url.Parse(collectorURL) + if err != nil { + return nil, fmt.Errorf("invalid collector URL %q: %v", collectorURL, err) + } + if u.Scheme == "" || u.Host == "" { + return nil, fmt.Errorf("invalid collector URL %q: no scheme or host", collectorURL) + } + + cfg := config{} + for _, opt := range opts { + cfg = opt.apply(cfg) + } + + if cfg.client == nil { + cfg.client = http.DefaultClient + } + return &Exporter{ + url: collectorURL, + client: cfg.client, + logger: cfg.logger, + }, nil +} + +// ExportSpans exports spans to a Zipkin receiver. +func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + e.stoppedMu.RLock() + stopped := e.stopped + e.stoppedMu.RUnlock() + if stopped { + e.logf("exporter stopped, not exporting span batch") + return nil + } + + if len(spans) == 0 { + e.logf("no spans to export") + return nil + } + models := SpanModels(spans) + body, err := json.Marshal(models) + if err != nil { + return e.errf("failed to serialize zipkin models to JSON: %v", err) + } + e.logf("about to send a POST request to %s with body %s", e.url, body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.url, bytes.NewBuffer(body)) + if err != nil { + return e.errf("failed to create request to %s: %v", e.url, err) + } + req.Header.Set("Content-Type", "application/json") + resp, err := e.client.Do(req) + if err != nil { + return e.errf("request to %s failed: %v", e.url, err) + } + defer resp.Body.Close() + + // Zipkin API returns a 202 on success and the content of the body isn't interesting + // but it is still being read because according to https://golang.org/pkg/net/http/#Response + // > The default HTTP client's Transport may not reuse HTTP/1.x "keep-alive" TCP connections + // > if the Body is not read to completion and closed. + _, err = io.Copy(io.Discard, resp.Body) + if err != nil { + return e.errf("failed to read response body: %v", err) + } + + // if resp.StatusCode != http.StatusAccepted { + // return e.errf("failed to send spans to zipkin server with status %d", resp.StatusCode) + // } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return e.errf("failed to send spans to zipkin server with status %d", resp.StatusCode) + } + + return nil +} + +// Shutdown stops the exporter flushing any pending exports. +func (e *Exporter) Shutdown(ctx context.Context) error { + e.stoppedMu.Lock() + e.stopped = true + e.stoppedMu.Unlock() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + +func (e *Exporter) logf(format string, args ...interface{}) { + if e.logger != emptyLogger { + e.logger.Info(fmt.Sprintf(format, args...)) + } +} + +func (e *Exporter) errf(format string, args ...interface{}) error { + e.logf(format, args...) + return fmt.Errorf(format, args...) +} + +// MarshalLog is the marshaling function used by the logging system to represent this exporter. +func (e *Exporter) MarshalLog() interface{} { + return struct { + Type string + URL string + }{ + Type: "zipkin", + URL: e.url, + } +} diff --git a/lib/tracing/client/otel/trace.go b/lib/tracing/client/otel/trace.go index d2ba420..7d643f3 100644 --- a/lib/tracing/client/otel/trace.go +++ b/lib/tracing/client/otel/trace.go @@ -6,7 +6,8 @@ import ( "go.opentelemetry.io/contrib/propagators/b3" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/zipkin" + // "go.opentelemetry.io/otel/exporters/zipkin" + "github.com/sunmi-OS/gocore/v2/lib/tracing/client/otel/exporter/zipkin" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" diff --git a/tools/gocore/template/cmd_api.got b/tools/gocore/template/cmd_api.got index 99e344b..b933d06 100644 --- a/tools/gocore/template/cmd_api.got +++ b/tools/gocore/template/cmd_api.got @@ -30,6 +30,7 @@ func RunApi(c *cli.Context) error { initConf() initDB() initCache() + initLog() isDebugMode := true if utils.IsRelease() { diff --git a/tools/gocore/template/cmd_api.got.go b/tools/gocore/template/cmd_api.got.go index fd0bf0b..711d36d 100755 --- a/tools/gocore/template/cmd_api.got.go +++ b/tools/gocore/template/cmd_api.got.go @@ -39,6 +39,7 @@ func RunApi(c *cli.Context) error { initConf() initDB() initCache() + initLog() isDebugMode := true if utils.IsRelease() { diff --git a/utils/slice/slice.go b/utils/slice/slice.go new file mode 100644 index 0000000..fb6fc13 --- /dev/null +++ b/utils/slice/slice.go @@ -0,0 +1,92 @@ +// Portions of this file are derived from go project +// Copyright 2021 The Go Authors. +// License: BSD-style +// Source: https://github.com/golang/go/tree/master/src/slices + +// Package slices defines various functions useful with slices of any type. +package slice + +// Index returns the index of the first occurrence of v in s, +// or -1 if not present. +func Index[S ~[]E, E comparable](s S, v E) int { + for i := range s { + if v == s[i] { + return i + } + } + return -1 +} + +// IndexFunc returns the first index i satisfying f(s[i]), +// or -1 if none do. +func IndexFunc[S ~[]E, E any](s S, f func(E) bool) int { + for i := range s { + if f(s[i]) { + return i + } + } + return -1 +} + +// Contains reports whether v is present in s. +func Contains[S ~[]E, E comparable](s S, v E) bool { + return Index(s, v) >= 0 +} + +// ContainsFunc reports whether at least one element e of s satisfies f(e). +func ContainsFunc[S ~[]E, E any](s S, f func(E) bool) bool { + return IndexFunc(s, f) >= 0 +} + +// Diff return the difference set of s1 and s2, that is, the set of all elements that belong to s1 but not s2. +func Diff[S ~[]E, E comparable](s1, s2 S) S { + set := make(map[E]struct{}) // use an empty struct as the value because it takes up no space + + var diff S + + // add the element from s2 to the map + for _, item := range s2 { + set[item] = struct{}{} + } + + // traverse s1, adding the difference set if the element is not in the map + for _, item := range s1 { + if _, found := set[item]; !found { + diff = append(diff, item) + } + } + + return diff +} + +// Clone returns a copy of the slice. +// The elements are copied using assignment, so this is a shallow clone. +func Clone[S ~[]E, E any](s S) S { + // Preserve nil in case it matters. + if s == nil { + return nil + } + return append(S([]E{}), s...) +} + +// Intersect return the intersection set of s1 and s2, that is, the set of all elements that belong to both s1 and s2. +func Intersect[S ~[]E, E comparable](s1, s2 S) S { + set := make(map[E]struct{}) // use an empty struct as the value because it takes up no space + var intersection S + + // Create a set from the elements of the first slice. + for _, item := range s1 { + set[item] = struct{}{} + } + + // Check if elements of the second slice are in the set. + for _, item := range s2 { + if _, exists := set[item]; exists { + intersection = append(intersection, item) + // Remove the item from the set to prevent duplicates in the intersection. + delete(set, item) + } + } + + return intersection +} diff --git a/utils/slice/slice_test.go b/utils/slice/slice_test.go new file mode 100644 index 0000000..b029f1b --- /dev/null +++ b/utils/slice/slice_test.go @@ -0,0 +1,37 @@ +package slice + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDiff(t *testing.T) { + var validTests = []struct { + s1 []int + s2 []int + s3 []int + }{ + {[]int{1, 2, 3, 4, 5}, []int{4, 5, 6}, []int{1, 2, 3}}, + {[]int{1, 3, 5}, []int{2, 4, 7}, []int{1, 3, 5}}, + } + + for _, tt := range validTests { + assert.Equal(t, tt.s3, Diff(tt.s1, tt.s2)) + } +} + +func TestIntersect(t *testing.T) { + var validTests = []struct { + s1 []int + s2 []int + s3 []int + }{ + {[]int{1, 2, 3, 4, 5}, []int{4, 5, 6}, []int{4, 5}}, + {[]int{1, 3, 5}, []int{1, 2, 4, 7}, []int{1}}, + } + + for _, tt := range validTests { + assert.Equal(t, tt.s3, Intersect(tt.s1, tt.s2)) + } +}