Skip to content

Commit

Permalink
Implement error generation
Browse files Browse the repository at this point in the history
  • Loading branch information
utr1903 committed Feb 8, 2024
1 parent 6b3647c commit 5ef035d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 8 deletions.
6 changes: 6 additions & 0 deletions apps/golang/commons/dtos/dto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dtos

type CreateRequestDto struct {
Name string `json:"name"`
Error string `json:"error"`
}
57 changes: 53 additions & 4 deletions apps/golang/kafkaconsumer/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package consumer

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"

"github.com/IBM/sarama"
"github.com/sirupsen/logrus"
"github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/dtos"
"github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/logger"
"github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/mysql"
otelkafka "github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/otel/kafka"
Expand Down Expand Up @@ -204,12 +206,19 @@ func (g *groupHandler) consumeMessage(
consumeFunc := func(ctx context.Context) error {

// Parse name out of the message
name := string(msg.Value)
body, err := g.parseMessageBody(ctx, msg.Value)
if err != nil {
g.logger.Log(logrus.ErrorLevel, ctx, "", "Consuming message is failed.")
return err
}

name := body.Name
errType := body.Error

g.logger.Log(logrus.InfoLevel, ctx, name, "Consuming message...")

// Store it into db
err := g.storeIntoDb(ctx, name)
err = g.storeIntoDb(ctx, name, errType)
if err != nil {
g.logger.Log(logrus.ErrorLevel, ctx, name, "Consuming message is failed.")
return err
Expand All @@ -229,16 +238,56 @@ func (g *groupHandler) consumeMessage(
return nil
}

func (g *groupHandler) parseMessageBody(
ctx context.Context,
messageBody []byte,
) (
*dtos.CreateRequestDto,
error,
) {

// Start parsing span
parentSpan := trace.SpanFromContext(ctx)
ctx, parseSpan := parentSpan.TracerProvider().
Tracer(semconv.KafkaConsumerName).
Start(
ctx,
"parse dto",
trace.WithSpanKind(trace.SpanKindInternal),
)
defer parseSpan.End()

g.logger.Log(logrus.InfoLevel, ctx, "", "Parsing dto...")

var dto *dtos.CreateRequestDto
err := json.Unmarshal(messageBody, dto)
if err != nil {
msg := "Parsing dto failed."
g.logger.Log(logrus.ErrorLevel, ctx, "", msg)
g.addErrorToSpan(parseSpan, msg, err)
return nil, err
}

g.logger.Log(logrus.InfoLevel, ctx, dto.Name, "Parsing dto succeeded.")
return dto, nil
}

func (g *groupHandler) storeIntoDb(
ctx context.Context,
name string,
errType string,
) error {

g.logger.Log(logrus.InfoLevel, ctx, name, "Storing into DB...")

// Build db query
// Create table does not exist error
var dbStatement string
dbOperation := "INSERT"
dbStatement := dbOperation + " INTO " + g.MySql.Opts.Table + " (name) VALUES (?)"
if errType == "tableDoesNotExistError" {
dbStatement = dbOperation + " INTO " + "faketable" + " (name) VALUES (?)"
} else {
dbStatement = dbOperation + " INTO " + g.MySql.Opts.Table + " (name) VALUES (?)"
}

// Create database span
parentSpan := trace.SpanFromContext(ctx)
Expand Down
44 changes: 40 additions & 4 deletions apps/golang/simulator/kafkaproducer/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@ package kafkaproducer

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"

"github.com/IBM/sarama"
"github.com/sirupsen/logrus"
"github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/dtos"
"github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/logger"

otelkafka "github.com/utr1903/opentelemetry-kubernetes-demo/apps/golang/commons/otel/kafka"
)

var (
randomErrors = map[int]string{
1: "databaseConnectionError",
2: "tableDoesNotExistError",
3: "preprocessingException",
4: "schemaNotFoundInCacheWarning",
}
)

type Opts struct {
ServiceName string
RequestInterval int64
Expand Down Expand Up @@ -144,25 +155,50 @@ func (k *KafkaConsumerSimulator) publishMessages(
// Keep publishing messages
for {
func() {
// Inject tracing info into message
ctx := context.Background()

// Make request after each interval
time.Sleep(time.Duration(k.Opts.RequestInterval) * time.Millisecond)

// Get a random user
user := users[k.Randomizer.Intn(len(users))]

// Create message
body, err := k.createMessageBody(user)
if err != nil {
k.logger.Log(logrus.ErrorLevel, ctx, user, "Creating message body failed:"+err.Error())
return
}
msg := sarama.ProducerMessage{
Topic: k.Opts.BrokerTopic,
Value: sarama.ByteEncoder([]byte(user)),
Value: sarama.ByteEncoder(body),
}

// Inject tracing info into message
ctx := context.Background()

// Publish message
k.logger.Log(logrus.InfoLevel, ctx, user, "Publishing message...")
otelproducer.Publish(ctx, &msg)
k.logger.Log(logrus.InfoLevel, ctx, user, "Message published successfully.")
}()
}
}

// Creates the message body with a potential random error
func (k *KafkaConsumerSimulator) createMessageBody(
user string,
) (
[]byte,
error,
) {

dto := &dtos.CreateRequestDto{
Name: user,
}

randomNum := k.Randomizer.Intn(15)
if randomNum == 1 || randomNum == 2 || randomNum == 3 || randomNum == 4 {
dto.Error = randomErrors[randomNum]
}

return json.Marshal(dto)
}

0 comments on commit 5ef035d

Please sign in to comment.