Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented simulator Golang #28

Merged
merged 6 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/docker_build_push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
options:
- httpserver
- kafkaconsumer
- simulator

jobs:
docker_build:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/helm_deploy_application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
options:
- httpserver
- kafkaconsumer
- simulator

jobs:
helm_deploy:
Expand Down
163 changes: 163 additions & 0 deletions apps/golang/commons/otel/http/http_client_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package http

import (
"context"
"net/http"
"time"

semconv "github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

type Opts struct {
Timeout time.Duration
}

type OptFunc func(*Opts)

func defaultOpts() *Opts {
return &Opts{
Timeout: time.Duration(30 * time.Second),
}
}

type HttpClient struct {
Opts *Opts

client *http.Client

tracer trace.Tracer
meter metric.Meter
propagator propagation.TextMapPropagator

latency metric.Float64Histogram
}

func New(
optFuncs ...OptFunc,
) *HttpClient {

// Instantiate options with default values
opts := defaultOpts()

// Apply external options
for _, f := range optFuncs {
f(opts)
}

c := &http.Client{
Timeout: opts.Timeout,
}

// Instantiate trace provider
tracer := otel.GetTracerProvider().Tracer(semconv.HttpClientName)

// Instantiate meter provider
meter := otel.GetMeterProvider().Meter(semconv.HttpClientName)

// Instantiate propagator
propagator := otel.GetTextMapPropagator()

// Create HTTP client latency histogram
latency, err := meter.Float64Histogram(
semconv.HttpClientLatencyName,
metric.WithUnit("ms"),
metric.WithDescription("Measures the duration of HTTP request handling"),
metric.WithExplicitBucketBoundaries(semconv.HttpExplicitBucketBoundaries...),
)
if err != nil {
panic(err)
}

return &HttpClient{
client: c,

tracer: tracer,
meter: meter,
propagator: propagator,

latency: latency,
}
}

// Configure timeout of the HTTP client
func WithTimeout(timeout time.Duration) OptFunc {
return func(opts *Opts) {
opts.Timeout = timeout
}
}

func (c *HttpClient) Do(
ctx context.Context,
req *http.Request,
spanName string,
) (
*http.Response,
error,
) {
requestStartTime := time.Now()

// Parse HTTP attributes from the request for both span and metrics
spanAttrs, metricAttrs := c.getSpanAndMetricServerAttributes(req)

// Create span options
spanOpts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(spanAttrs...),
}

// Start HTTP server span
ctx, span := c.tracer.Start(ctx, spanName, spanOpts...)
defer span.End()

// Inject context into the HTTP headers
headers := http.Header{}
carrier := propagation.HeaderCarrier(headers)
otel.GetTextMapPropagator().Inject(ctx, carrier)
for k, v := range headers {
req.Header.Add(k, v[0])
}

res, err := c.client.Do(req)

// Add HTTP status code to the attributes
span.SetAttributes(semconv.HttpResponseStatusCode.Int(res.StatusCode))
metricAttrs = append(metricAttrs, semconv.HttpResponseStatusCode.Int(res.StatusCode))

// Create metric options
metricOpts := metric.WithAttributes(metricAttrs...)

// Record server latency
elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
c.latency.Record(ctx, elapsedTime, metricOpts)

return res, err
}

func (m *HttpClient) getSpanAndMetricServerAttributes(
r *http.Request,
) (
[]attribute.KeyValue,
[]attribute.KeyValue,
) {
spanAttrs := semconv.WithHttpClientAttributes(r)
metricAttrs := make([]attribute.KeyValue, len(spanAttrs))
copy(metricAttrs, spanAttrs)

var url string
if r.URL != nil {
// Remove any username/password info that may be in the URL.
userinfo := r.URL.User
r.URL.User = nil
url = r.URL.String()
// Restore any username/password info that was removed.
r.URL.User = userinfo
}
spanAttrs = append(spanAttrs, semconv.HttpUrlFull.String(url))

return spanAttrs, metricAttrs
}
125 changes: 125 additions & 0 deletions apps/golang/commons/otel/http/http_client_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package http

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"

"github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/otel"
semconv "github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/otel/semconv/v1.24.0"
)

func Test_CommonClientAttributesCreatedSuccessfully(t *testing.T) {
httpMethod := http.MethodPost
host := "localhost"
port := "8080"
userAgent := "test_agent"

// Set headers
headers := http.Header{}
headers.Set("User-Agent", userAgent)

req, err := http.NewRequest(
httpMethod,
fmt.Sprintf("http://%s:%s/", host, port),
nil,
)
if err != nil {
t.Fatal(err)
}
req.Header = headers

c := &HttpClient{}
spanAttrs, metricAttrs := c.getSpanAndMetricServerAttributes(req)

// Check lengths of span and metric attributes
if len(spanAttrs) != len(metricAttrs) {
t.Error("Number of span and metric attributes are not the same!")
}

for i, spanAttr := range spanAttrs {
metricAttr := metricAttrs[i]

// Check span and metric attribute key and value
if spanAttr != metricAttr {
t.Error("Span and metric attribute are not the same!")
}

if spanAttr.Key == semconv.NetworkProtocolVersionName &&
spanAttr.Value.AsString() != "1.1" {
t.Errorf("%s is set incorrectly!", semconv.NetworkProtocolVersionName)
}

if spanAttr.Key == semconv.HttpMethodKeyName &&
spanAttr.Value.AsString() != httpMethod {
t.Errorf("%s is set incorrectly!", semconv.HttpMethodKeyName)
}

if spanAttr.Key == semconv.ServerAddressName &&
spanAttr.Value.AsString() != host {
t.Errorf("%s is set incorrectly!", semconv.ServerAddressName)
}

if spanAttr.Key == semconv.ServerPortName {
portAsInt, _ := strconv.ParseInt(port, 10, 64)
if spanAttr.Value.AsInt64() != portAsInt {
t.Errorf("%s is set incorrectly!", semconv.ServerPortName)
}
}

if spanAttr.Key == semconv.HttpUserAgentOriginalName &&
spanAttr.Value.AsString() != userAgent {
t.Errorf("%s is set incorrectly!", semconv.HttpUserAgentOriginalName)
}
}
}

func Test_InjectTraceContextCorrectly(t *testing.T) {

mockCtx := context.Background()

// Create tracer provider
tp := otel.NewTraceProvider(mockCtx)
defer otel.ShutdownTraceProvider(mockCtx, tp)

// prop := propagation.TraceContext{}

httpMethod := http.MethodPost
httpStatusCode := http.StatusAccepted

// Createa a mock HTTP server
mockServer := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(httpStatusCode)
w.Write([]byte("Test"))
}))
defer mockServer.Close()

httpClient := New(
WithTimeout(time.Duration(10 * time.Second)),
)

req, err := http.NewRequest(
httpMethod,
mockServer.URL,
nil,
)
if err != nil {
t.Fatal(err)
}

res, _ := httpClient.Do(mockCtx, req, fmt.Sprintf("HTTP %s", req.Method))
if res.StatusCode != httpStatusCode {
t.Error("HTTP status code is not the as given.")
}

traceparent := req.Header.Get("Traceparent")
if len(traceparent) == 0 {
t.Error("Span context is not set to the outgoing request.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

func Test_CommonAttributesCreatedSuccessfully(t *testing.T) {
func Test_CommonServerAttributesCreatedSuccessfully(t *testing.T) {
httpMethod := http.MethodPost
host := "localhost"
port := "8080"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type KafkaConsumer struct {
latency metric.Float64Histogram
}

func New() *KafkaConsumer {
func NewKafkaConsumer() *KafkaConsumer {

// Instantiate trace provider
tracer := otel.GetTracerProvider().Tracer(semconv.KafkaConsumerName)
Expand Down
Loading
Loading