From e07f5b2eb5e3e7373bc225a3af6181ab567525ec Mon Sep 17 00:00:00 2001 From: Lam Tran Date: Mon, 26 Feb 2024 09:37:33 +0700 Subject: [PATCH] fix(accountingservice): fix graceful shutdown and log issues (#1401) Co-authored-by: Pierre Tessier --- src/accountingservice/go.mod | 10 +++++----- src/accountingservice/go.sum | 24 ++++++++++++------------ src/accountingservice/kafka/consumer.go | 17 ++++++++++------- src/accountingservice/main.go | 16 ++++++++++++++-- 4 files changed, 41 insertions(+), 26 deletions(-) diff --git a/src/accountingservice/go.mod b/src/accountingservice/go.mod index 56c970cdf3..a35e4c1e9e 100644 --- a/src/accountingservice/go.mod +++ b/src/accountingservice/go.mod @@ -3,13 +3,13 @@ module github.com/open-telemetry/opentelemetry-demo/src/accountingservice go 1.22 require ( - github.com/IBM/sarama v1.42.1 + github.com/IBM/sarama v1.42.2 github.com/sirupsen/logrus v1.9.3 go.opentelemetry.io/otel v1.23.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 go.opentelemetry.io/otel/sdk v1.23.1 go.opentelemetry.io/otel/trace v1.23.1 - google.golang.org/grpc v1.61.0 + google.golang.org/grpc v1.61.1 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 google.golang.org/protobuf v1.32.0 ) @@ -39,9 +39,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect go.opentelemetry.io/otel/metric v1.23.1 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect - golang.org/x/crypto v0.18.0 // indirect - golang.org/x/net v0.20.0 // indirect - golang.org/x/sys v0.16.0 // indirect + golang.org/x/crypto v0.19.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect diff --git a/src/accountingservice/go.sum b/src/accountingservice/go.sum index e150c9ac36..d34a6a953e 100644 --- a/src/accountingservice/go.sum +++ b/src/accountingservice/go.sum @@ -1,5 +1,5 @@ -github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= -github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw= +github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -90,8 +90,8 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -99,12 +99,12 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= -golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -112,8 +112,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -134,8 +134,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1: google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe h1:bQnxqljG/wqi4NTXu2+DJ3n7APcEA882QZ1JvhQAq9o= google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= -google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= -google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= +google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= +google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/src/accountingservice/kafka/consumer.go b/src/accountingservice/kafka/consumer.go index 1599a507ae..2b1ab2b490 100644 --- a/src/accountingservice/kafka/consumer.go +++ b/src/accountingservice/kafka/consumer.go @@ -4,8 +4,6 @@ package kafka import ( "context" - "log" - pb "github.com/open-telemetry/opentelemetry-demo/src/accountingservice/genproto/oteldemo" "github.com/IBM/sarama" @@ -19,7 +17,7 @@ var ( GroupID = "accountingservice" ) -func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logger) error { +func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logger) (sarama.ConsumerGroup, error) { saramaConfig := sarama.NewConfig() saramaConfig.Version = ProtocolVersion // So we can know the partition and offset of messages. @@ -28,7 +26,7 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge consumerGroup, err := sarama.NewConsumerGroup(brokers, GroupID, saramaConfig) if err != nil { - return err + return nil, err } handler := groupHandler{ @@ -37,9 +35,10 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge err = consumerGroup.Consume(ctx, []string{Topic}, &handler) if err != nil { - return err + return nil, err } - return nil + + return consumerGroup, nil } type groupHandler struct { @@ -64,7 +63,11 @@ func (g *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s return err } - log.Printf("Message claimed: orderId = %s, timestamp = %v, topic = %s", orderResult.OrderId, message.Timestamp, message.Topic) + g.log.WithFields(logrus.Fields{ + "orderId": orderResult.OrderId, + "messageTimestamp": message.Timestamp, + "messageTopic": message.Topic, + }).Info("Message claimed") session.MarkMessage(message, "") case <-session.Context().Done(): diff --git a/src/accountingservice/main.go b/src/accountingservice/main.go index 60f966b06d..e2eb7a3dd0 100644 --- a/src/accountingservice/main.go +++ b/src/accountingservice/main.go @@ -13,8 +13,10 @@ import ( "os/signal" "strings" "sync" + "syscall" "time" + "github.com/IBM/sarama" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -85,6 +87,7 @@ func main() { if err := tp.Shutdown(context.Background()); err != nil { log.Printf("Error shutting down tracer provider: %v", err) } + log.Println("Shutdown trace provider") }() var brokers string @@ -93,13 +96,22 @@ func main() { brokerList := strings.Split(brokers, ",") log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", ")) - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGKILL) defer cancel() - if err := kafka.StartConsumerGroup(ctx, brokerList, log); err != nil { + var consumerGroup sarama.ConsumerGroup + if consumerGroup, err = kafka.StartConsumerGroup(ctx, brokerList, log); err != nil { log.Fatal(err) } + defer func() { + if err := consumerGroup.Close(); err != nil { + log.Printf("Error closing consumer group: %v", err) + } + log.Println("Closed consumer group") + }() <-ctx.Done() + + log.Println("Accounting service exited") } func mustMapEnv(target *string, envKey string) {