Skip to content

Commit

Permalink
feat: Adding TopicMessageQuery error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andrix10 committed Dec 29, 2020
1 parent 1f58865 commit c08b4cf
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions topic_message_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hedera

import (
"context"

"math"
"sync"
"time"
Expand All @@ -13,13 +12,15 @@ import (
)

type TopicMessageQuery struct {
pb *mirror.ConsensusTopicQuery
pb *mirror.ConsensusTopicQuery
errorHandler func(stat status.Status)
}

func NewTopicMessageQuery() *TopicMessageQuery {
pb := mirror.ConsensusTopicQuery{}
return &TopicMessageQuery{
pb: &pb,
pb: &pb,
errorHandler: nil,
}
}

Expand Down Expand Up @@ -71,6 +72,11 @@ func (query *TopicMessageQuery) GetLimit() uint64 {
return query.pb.Limit
}

func (query *TopicMessageQuery) SetErrorHandler(errorHandler func(stat status.Status)) *TopicMessageQuery {
query.errorHandler = errorHandler
return query
}

func (query *TopicMessageQuery) Subscribe(client *Client, onNext func(TopicMessage)) (SubscriptionHandle, error) {
ctx, cancel := context.WithCancel(context.TODO())
handle := newSubscriptionHandle(cancel)
Expand All @@ -85,14 +91,22 @@ func (query *TopicMessageQuery) Subscribe(client *Client, onNext func(TopicMessa
resubscribe := true
channel, err := client.mirrorNetwork.getNextMirrorNode().getChannel()
if err != nil {
panic(err)
cancel()
grpcErr, ok := status.FromError(err)
if query.errorHandler == nil && ok {
query.errorHandler(*grpcErr)
}
}

for {
if resubscribe {
subClient, err = (*channel).SubscribeTopic(ctx, query.pb)
if err != nil {
panic(err)
cancel()
grpcErr, ok := status.FromError(err)
if query.errorHandler == nil && ok {
query.errorHandler(*grpcErr)
}
}
}

Expand All @@ -112,6 +126,10 @@ func (query *TopicMessageQuery) Subscribe(client *Client, onNext func(TopicMessa
break
} else {
cancel()
grpcErr, ok := status.FromError(err)
if query.errorHandler == nil && ok {
query.errorHandler(*grpcErr)
}
break
}
}
Expand Down

0 comments on commit c08b4cf

Please sign in to comment.