Skip to content

Commit

Permalink
feat(wip): implement TopicMessageQuery and `TopicMessageSubmitTrans…
Browse files Browse the repository at this point in the history
…action`
  • Loading branch information
janaakhterov committed Nov 3, 2020
1 parent 02e4ae2 commit 2413273
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 50 deletions.
53 changes: 12 additions & 41 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sort"
"time"

// "github.com/hashgraph/hedera-sdk-go/proto"
"google.golang.org/grpc"
)

Expand All @@ -27,8 +26,7 @@ type Client struct {
networkNodeIds []node
network map[AccountID]node

mirrorChannels map[string]*grpc.ClientConn
mirrorNetwork []string
mirrorNetwork mirrorNetwork

nextNodeIndex uint
lastSortedNodeAccountIDs int64
Expand Down Expand Up @@ -117,9 +115,7 @@ func newClient(network map[string]AccountID, mirrorNetwork []string) *Client {
networkChannels: make(map[AccountID]*channel),
networkNodeIds: make([]node, 0),
network: newNetwork,
mirrorChannels: make(map[string]*grpc.ClientConn),
mirrorNetwork: make([]string, 0),
nextNodeIndex: 0,
mirrorNetwork: newMirrorNetwork(mirrorNetwork),
lastSortedNodeAccountIDs: time.Now().UTC().UnixNano(),
}

Expand Down Expand Up @@ -239,7 +235,7 @@ func (client *Client) SetNetwork(network map[string]AccountID) *Client {
// SetNetwork replaces all nodes in the Client with a new set of nodes.
// (e.g. for an Address Book update).
func (client *Client) SetMirrorNetwork(mirrorNetwork []string) *Client {
client.mirrorNetwork = mirrorNetwork
client.mirrorNetwork.setNetwork(mirrorNetwork)

return client
}
Expand Down Expand Up @@ -305,40 +301,15 @@ func (client *Client) SetMaxQueryPayment(payment Hbar) *Client {
return client
}

// // Ping sends an AccountBalanceQuery to the specified node returning nil if no
// // problems occur. Otherwise, an error representing the status of the node will
// // be returned.
// func (client *Client) Ping(nodeID AccountID) error {
// node := client.networkNodes[nodeID]
// if node == nil {
// return fmt.Errorf("node with ID %s not registered on this client", nodeID)
// }

// pingQuery := NewAccountBalanceQuery().
// SetAccountID(nodeID)

// pb := pingQuery.QueryBuilder.pb

// resp := new(proto.Response)

// err := node.invoke(methodName(pb), pb, resp)

// if err != nil {
// return newErrPingStatus(err)
// }

// respHeader := mapResponseHeader(resp)

// if respHeader.NodeTransactionPrecheckCode == proto.ResponseCodeEnum_BUSY {
// return newErrPingStatus(fmt.Errorf("%s", Status(respHeader.NodeTransactionPrecheckCode).String()))
// }

// if isResponseUnknown(resp) {
// return newErrPingStatus(fmt.Errorf("unknown"))
// }

// return nil
// }
// Ping sends an AccountBalanceQuery to the specified node returning nil if no
// problems occur. Otherwise, an error representing the status of the node will
// be returned.
func (client *Client) Ping(nodeID AccountID) error {
_, err := NewAccountBalanceQuery().
SetNodeAccountIDs([]AccountID{nodeID}).
Execute(client)
return err
}

func (client *Client) getNextNode() AccountID {
nodeID := client.networkNodeIds[client.nextNodeIndex]
Expand Down
77 changes: 77 additions & 0 deletions mirror_network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package hedera

import (
"math/rand"

"github.com/hashgraph/hedera-sdk-go/proto/mirror"
"google.golang.org/grpc"
)

type mirrorNetwork struct {
channels map[string]mirror.ConsensusServiceClient
network []string
index uint
}

func newMirrorNetwork(network []string) mirrorNetwork {
if len(network) > 0 {
rand.Shuffle(len(network), func(i, j int) {
network[i], network[j] = network[j], network[i]
})
}

return mirrorNetwork{
channels: make(map[string]mirror.ConsensusServiceClient),
network: network,
index: 0,
}
}

func contains(arr []string, str string) bool {
for _, a := range arr {
if a == str {
return true
}
}
return false
}

func (network mirrorNetwork) setNetwork(newNetwork []string) {
for _, n := range network.network {
if !contains(newNetwork, n) {
delete(network.channels, n)
}
}

for _, n := range newNetwork {
if !contains(network.network, n) {
network.network = append(network.network, n)
}
}

network.index = 0

if len(network.network) > 0 {
rand.Shuffle(len(network.network), func(i, j int) {
network.network[i], network.network[j] = network.network[j], network.network[i]
})
}
}

func (network mirrorNetwork) getNextChannel() (mirror.ConsensusServiceClient, error) {
if channel, ok := network.channels[network.network[network.index]]; ok {
network.index = (network.index + 1) % uint(len(network.network))
return channel, nil
}

conn, err := grpc.Dial(network.network[network.index], grpc.WithInsecure())
if err != nil {
return nil, err
}

channel := mirror.NewConsensusServiceClient(conn)

network.channels[network.network[network.index]] = channel
network.index = (network.index + 1) % uint(len(network.network))
return channel, nil
}
13 changes: 13 additions & 0 deletions subscription_handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package hedera

type SubscriptionHandle struct {
onUnsubscribe func()
}

func newSubscriptionHandle(onUnsubscribe func()) SubscriptionHandle {
return SubscriptionHandle{onUnsubscribe: onUnsubscribe}
}

func (handle SubscriptionHandle) Unsubscribe() {
handle.onUnsubscribe()
}
2 changes: 1 addition & 1 deletion topic_delete_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (transaction *TopicDeleteTransaction) SetTopicID(ID TopicID) *TopicDeleteTr
}

func (transaction *TopicDeleteTransaction) GetTopicID() TopicID {
return TopicIDFromProtobuf(transaction.pb.GetTopicID())
return topicIDFromProtobuf(transaction.pb.GetTopicID())
}

//
Expand Down
2 changes: 1 addition & 1 deletion topic_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (id TopicID) toProtobuf() *proto.TopicID {
}
}

func TopicIDFromProtobuf(pb *proto.TopicID) TopicID {
func topicIDFromProtobuf(pb *proto.TopicID) TopicID {
return TopicID{
Shard: uint64(pb.ShardNum),
Realm: uint64(pb.RealmNum),
Expand Down
61 changes: 61 additions & 0 deletions topic_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package hedera

import (
"time"

"github.com/hashgraph/hedera-sdk-go/proto/mirror"
)

type TopicMessage struct {
ConsensusTimestamp time.Time
Contents []byte
RunningHash []byte
SequenceNumber uint64
Chunks []TopicMessageChunk
TransactionID *TransactionID
}

func topicMessageOfSingle(resp *mirror.ConsensusTopicResponse) TopicMessage {
return TopicMessage{
ConsensusTimestamp: timeFromProtobuf(resp.ConsensusTimestamp),
Contents: resp.Message,
RunningHash: resp.RunningHash,
SequenceNumber: resp.SequenceNumber,
Chunks: nil,
TransactionID: nil,
}
}

func topicMessageOfMany(message []*mirror.ConsensusTopicResponse) TopicMessage {
length := len(message)
size := uint64(0)
chunks := make([]TopicMessageChunk, length)
messages := make([][]byte, length)
var transactionID *TransactionID = nil

for _, m := range message {
if transactionID == nil {
value := transactionIDFromProtobuf(m.ChunkInfo.InitialTransactionID)
transactionID = &value
}

chunks[m.ChunkInfo.Number-1] = newTopicMessageChunk(m)
messages[m.ChunkInfo.Number-1] = m.Message
size += uint64(len(m.Message))
}

finalMessage := make([]byte, 0, size)

for _, m := range messages {
finalMessage = append(finalMessage, m...)
}

return TopicMessage{
ConsensusTimestamp: timeFromProtobuf(message[length-1].ConsensusTimestamp),
RunningHash: message[length-1].RunningHash,
SequenceNumber: message[length-1].SequenceNumber,
Contents: finalMessage,
Chunks: chunks,
TransactionID: transactionID,
}
}
23 changes: 23 additions & 0 deletions topic_message_chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package hedera

import (
"time"

"github.com/hashgraph/hedera-sdk-go/proto/mirror"
)

type TopicMessageChunk struct {
ConsensusTimestamp time.Time
ContentSize uint64
RunningHash []byte
SequenceNumber uint64
}

func newTopicMessageChunk(resp *mirror.ConsensusTopicResponse) TopicMessageChunk {
return TopicMessageChunk{
ConsensusTimestamp: timeFromProtobuf(resp.ConsensusTimestamp),
ContentSize: uint64(len(resp.Message)),
RunningHash: resp.RunningHash,
SequenceNumber: resp.SequenceNumber,
}
}
Loading

0 comments on commit 2413273

Please sign in to comment.