diff --git a/.github/golangci.yaml b/.github/golangci.yaml index 68e0893e25c..1848e65a64c 100644 --- a/.github/golangci.yaml +++ b/.github/golangci.yaml @@ -7,6 +7,8 @@ run: modules-download-mode: readonly allow-parallel-runners: false go: "" + skip-dirs: # Temporary + - core output: uniq-by-line: false @@ -77,7 +79,6 @@ linters: - goimports # Unused imports - goconst # Repeated strings that could be replaced by a constant - dogsled # Checks assignments with too many blank identifiers (e.g. x, , , _, := f()) - - dupl # Code clone detection - errname # Checks that sentinel errors are prefixed with the Err and error types are suffixed with the Error - errorlint # errorlint is a linter for that can be used to find code that will cause problems with the error wrapping scheme introduced in Go 1.13 - unused # Checks Go code for unused constants, variables, functions and types diff --git a/Makefile b/Makefile index c2e10cd79f2..d130a5a869b 100644 --- a/Makefile +++ b/Makefile @@ -16,4 +16,4 @@ fixalign: protoc: # Make sure the following prerequisites are installed before running these commands: # https://grpc.io/docs/languages/go/quickstart/#prerequisites - protoc --proto_path=./ --go_out=./ --go-grpc_out=./ ./messages/proto/*.proto + protoc --go_out=./ ./messages/proto/*.proto diff --git a/core/message_queue.go b/core/message_queue.go deleted file mode 100644 index d393444729b..00000000000 --- a/core/message_queue.go +++ /dev/null @@ -1,44 +0,0 @@ -package core - -import "sort" - -// MessageQueue represents a message queue that maintains messages sorted by state -// This is helpful when some nodes are ahead (in terms of state) of others -type MessageQueue struct { - messages []*Msg -} - -func NewMessageQueue() *MessageQueue { - return &MessageQueue{ - messages: make([]*Msg, 0), - } -} - -// AddMessage adds a message to the queue while maintaining the sorted order by state -func (mq *MessageQueue) AddMessage(msg *Msg) { - index := sort.Search(len(mq.messages), func(i int) bool { - return mq.messages[i].state >= msg.state - }) - - mq.messages = append(mq.messages[:index], append([]*Msg{msg}, mq.messages[index:]...)...) -} - -// PopMessage pops and returns the next message that should be processed -func (mq *MessageQueue) PopMessage() *Msg { - if len(mq.messages) == 0 { - return nil - } - // The next message to be processed is the one with the lowest state - return mq.popMessageByState(mq.messages[0].state) -} - -func (mq *MessageQueue) popMessageByState(state State) *Msg { - for i, msg := range mq.messages { - if msg.state == state { - result := msg - mq.messages = append(mq.messages[:i], mq.messages[i+1:]...) - return result - } - } - return nil -} diff --git a/core/message_queue_test.go b/core/message_queue_test.go deleted file mode 100644 index f0b59649a03..00000000000 --- a/core/message_queue_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package core - -import "testing" - -func TestMessageQueue_ConsensusIntegration(t *testing.T) { - // Create a new instance of MessageQueue - messageQueue := NewMessageQueue() - - // Define messages from multiple sources with different states - message1 := &Msg{ - state: Propose, - height: 0, - round: 0, - block: []byte{1, 2, 3}, - from: []byte{0, 1, 2}, - } - message2 := &Msg{ - state: Prevote, - height: 0, - round: 0, - block: []byte{1, 2, 3}, - from: []byte{0, 1, 2}, - } - message3 := &Msg{ - state: Precommit, - height: 0, - round: 0, - block: []byte{1, 2, 3}, - from: []byte{0, 1, 2}, - } - - // Add messages to the queue - messageQueue.AddMessage(message2) - messageQueue.AddMessage(message3) - messageQueue.AddMessage(message1) - - // Create a new instance of Tendermint (consensus algorithm) - tendermint := NewTendermint(&Config{ - ctx: nil, - height: 0, - p: &prMock{getFunc: func(height uint64, block []byte, round int64) []byte { - return []byte{0, 1, 2} - }, me: []byte{0, 1, 2}}, - b: &brMock{}, - timeout: 0, - block: nil, - bv: &blMock{ - isMajority: true, - validBlocks: map[uint64]bool{ - 1: true, // Height 1 is valid - }, - }, - }) - - err := tendermint.Init() - - if err != nil { - t.Errorf("Error init: %v", err) - } - - var done bool - - // Process messages from the queue using the consensus algorithm - for { - // Pop the next message from the queue - message := messageQueue.PopMessage() - if message == nil { - // No more messages in the queue, exit the loop - break - } - - err, done = tendermint.ProcessMsg(message) - if err != nil { - t.Errorf("Error processing message: %v", err) - } - } - - if !done { - t.Error("Consensus should have been reached") - } -} diff --git a/go.mod b/go.mod index e5c4c497bf1..e31c398cf50 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,14 @@ module github.com/gnolang/go-tendermint go 1.21 -require google.golang.org/protobuf v1.32.0 +require ( + github.com/rs/xid v1.5.0 + github.com/stretchr/testify v1.8.4 + google.golang.org/protobuf v1.32.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 5361bc77f3e..8495da99566 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,18 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/messages/collector.go b/messages/collector.go new file mode 100644 index 00000000000..baed2deac52 --- /dev/null +++ b/messages/collector.go @@ -0,0 +1,117 @@ +package messages + +import ( + "fmt" + "sync" +) + +// msgType is the combined message type interface, +// for easy reference and type safety +type msgType interface { + ProposalMessage | PrevoteMessage | PrecommitMessage +} + +type ( + // collection are the actual received messages. + // Maps a unique identifier -> their message (of a specific type) to avoid duplicates. + // Identifiers are derived from . + // Each validator in the consensus needs to send at most 1 message of every type + // (minus the PROPOSAL, which is only sent by the proposer), + // so the message system needs to keep track of only 1 message per type, per validator, per view + collection[T msgType] map[string]*T +) + +// Collector is a single message type collector +type Collector[T msgType] struct { + collection collection[T] // the message storage + subscriptions subscriptions[T] // the active message subscriptions + + collectionMux sync.RWMutex + subscriptionsMux sync.RWMutex +} + +// NewCollector creates a new message collector +func NewCollector[T msgType]() *Collector[T] { + return &Collector[T]{ + collection: make(collection[T]), + subscriptions: make(subscriptions[T]), + } +} + +// Subscribe creates a new collector subscription. +// Returns the channel for receiving messages, +// as well as the unsubscribe method +func (c *Collector[T]) Subscribe() (<-chan MsgCallback[T], func()) { + c.subscriptionsMux.Lock() + defer c.subscriptionsMux.Unlock() + + // Create a new subscription + id, ch := c.subscriptions.add() + + // Create the unsubscribe callback + unsubscribeFn := func() { + c.subscriptionsMux.Lock() + defer c.subscriptionsMux.Unlock() + + c.subscriptions.remove(id) + } + + // Notify the subscription immediately, + // since there can be existing messages in the collection. + // This action assumes the channel is not blocking (created with initial size), + // since the calling context does not have access to it yet at this point + notifySubscription(ch, c.GetMessages) + + return ch, unsubscribeFn +} + +// GetMessages returns the currently present messages in the collector +func (c *Collector[T]) GetMessages() []*T { + c.collectionMux.RLock() + defer c.collectionMux.RUnlock() + + // Fetch the messages in the collection + return c.collection.getMessages() +} + +// getMessages fetches the messages in the collection +func (c *collection[T]) getMessages() []*T { + messages := make([]*T, 0, len(*c)) + + for _, senderMessage := range *c { + messages = append(messages, senderMessage) + } + + return messages +} + +// AddMessage adds a new message to the collector +func (c *Collector[T]) AddMessage(view *View, from []byte, message *T) { + c.collectionMux.Lock() + + // Add the message + c.collection.addMessage( + getCollectionKey(from, view), + message, + ) + + c.collectionMux.Unlock() + + // Notify the subscriptions + c.subscriptionsMux.RLock() + defer c.subscriptionsMux.RUnlock() + + c.subscriptions.notify(c.GetMessages) +} + +// addMessage adds a new message to the collection +func (c *collection[T]) addMessage(key string, message *T) { + (*c)[key] = message +} + +// getCollectionKey constructs a key based on the +// message sender and view information. +// This key guarantees uniqueness in the message store +func getCollectionKey(from []byte, view *View) string { + return fmt.Sprintf("%s_%d_%d", from, view.Height, view.Round) +} diff --git a/messages/collector_test.go b/messages/collector_test.go new file mode 100644 index 00000000000..fdcc23156c6 --- /dev/null +++ b/messages/collector_test.go @@ -0,0 +1,394 @@ +package messages + +import ( + "sort" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// generateMessages generates dummy messages +// for the given view and type +func generateMessages( + t *testing.T, + count int, + view *View, + messageTypes ...MessageType, +) []*Message { + t.Helper() + + messages := make([]*Message, 0, count) + + for index := 0; index < count; index++ { + for _, messageType := range messageTypes { + message := &Message{ + Type: messageType, + } + + switch messageType { + case MessageType_PROPOSAL: + message.Payload = &Message_ProposalMessage{ + ProposalMessage: &ProposalMessage{ + From: []byte(strconv.Itoa(index)), + View: view, + }, + } + case MessageType_PREVOTE: + message.Payload = &Message_PrevoteMessage{ + PrevoteMessage: &PrevoteMessage{ + From: []byte(strconv.Itoa(index)), + View: view, + }, + } + case MessageType_PRECOMMIT: + message.Payload = &Message_PrecommitMessage{ + PrecommitMessage: &PrecommitMessage{ + From: []byte(strconv.Itoa(index)), + View: view, + }, + } + } + + messages = append(messages, message) + } + } + + return messages +} + +func TestCollector_AddMessage(t *testing.T) { + t.Parallel() + + t.Run("empty message queue", func(t *testing.T) { + t.Parallel() + + // Create the collector + c := NewCollector[ProposalMessage]() + + // Fetch the messages + messages := c.GetMessages() + + require.NotNil(t, messages) + assert.Len(t, messages, 0) + }) + + t.Run("valid PROPOSAL messages fetched", func(t *testing.T) { + t.Parallel() + + var ( + count = 5 + initialView = &View{ + Height: 1, + Round: 0, + } + ) + + // Create the collector + c := NewCollector[ProposalMessage]() + + generatedMessages := generateMessages( + t, + count, + initialView, + MessageType_PROPOSAL, + ) + + expectedMessages := make([]*ProposalMessage, 0, count) + + for _, message := range generatedMessages { + proposal, ok := message.Payload.(*Message_ProposalMessage) + require.True(t, ok) + + c.AddMessage(proposal.ProposalMessage.View, proposal.ProposalMessage.From, proposal.ProposalMessage) + + expectedMessages = append(expectedMessages, proposal.ProposalMessage) + } + + // Sort the messages for the test + sort.SliceStable(expectedMessages, func(i, j int) bool { + return string(expectedMessages[i].From) < string(expectedMessages[j].From) + }) + + // Get the messages from the store + messages := c.GetMessages() + + // Sort the messages for the test + sort.SliceStable(messages, func(i, j int) bool { + return string(messages[i].From) < string(messages[j].From) + }) + + // Make sure the messages match + assert.Equal(t, expectedMessages, messages) + }) + + t.Run("valid PREVOTE messages fetched", func(t *testing.T) { + t.Parallel() + + var ( + count = 5 + initialView = &View{ + Height: 1, + Round: 0, + } + ) + + // Create the collector + c := NewCollector[PrevoteMessage]() + + generatedMessages := generateMessages( + t, + count, + initialView, + MessageType_PREVOTE, + ) + + expectedMessages := make([]*PrevoteMessage, 0, count) + + for _, message := range generatedMessages { + prevote, ok := message.Payload.(*Message_PrevoteMessage) + require.True(t, ok) + + c.AddMessage(prevote.PrevoteMessage.View, prevote.PrevoteMessage.From, prevote.PrevoteMessage) + + expectedMessages = append(expectedMessages, prevote.PrevoteMessage) + } + + // Sort the messages for the test + sort.SliceStable(expectedMessages, func(i, j int) bool { + return string(expectedMessages[i].From) < string(expectedMessages[j].From) + }) + + // Get the messages from the store + messages := c.GetMessages() + + // Sort the messages for the test + sort.SliceStable(messages, func(i, j int) bool { + return string(messages[i].From) < string(messages[j].From) + }) + + // Make sure the messages match + assert.Equal(t, expectedMessages, messages) + }) + + t.Run("valid PRECOMMIT messages fetched", func(t *testing.T) { + t.Parallel() + + var ( + count = 5 + initialView = &View{ + Height: 1, + Round: 0, + } + ) + + // Create the collector + c := NewCollector[PrecommitMessage]() + + generatedMessages := generateMessages( + t, + count, + initialView, + MessageType_PRECOMMIT, + ) + + expectedMessages := make([]*PrecommitMessage, 0, count) + + for _, message := range generatedMessages { + precommit, ok := message.Payload.(*Message_PrecommitMessage) + require.True(t, ok) + + c.AddMessage(precommit.PrecommitMessage.View, precommit.PrecommitMessage.From, precommit.PrecommitMessage) + + expectedMessages = append(expectedMessages, precommit.PrecommitMessage) + } + + // Sort the messages for the test + sort.SliceStable(expectedMessages, func(i, j int) bool { + return string(expectedMessages[i].From) < string(expectedMessages[j].From) + }) + + // Get the messages from the store + messages := c.GetMessages() + + // Sort the messages for the test + sort.SliceStable(messages, func(i, j int) bool { + return string(messages[i].From) < string(messages[j].From) + }) + + // Make sure the messages match + assert.Equal(t, expectedMessages, messages) + }) +} + +func TestCollector_AddDuplicateMessages(t *testing.T) { + t.Parallel() + + var ( + count = 5 + commonSender = []byte("sender 1") + commonType = MessageType_PREVOTE + view = &View{ + Height: 1, + Round: 1, + } + ) + + // Create the collector + c := NewCollector[PrevoteMessage]() + + generatedMessages := generateMessages( + t, + count, + view, + commonType, + ) + + for _, message := range generatedMessages { + prevote, ok := message.Payload.(*Message_PrevoteMessage) + require.True(t, ok) + + // Make sure each message is from the same sender + prevote.PrevoteMessage.From = commonSender + + c.AddMessage(prevote.PrevoteMessage.View, prevote.PrevoteMessage.From, prevote.PrevoteMessage) + } + + // Check that only 1 message has been added + assert.Len(t, c.GetMessages(), 1) +} + +func TestCollector_Subscribe(t *testing.T) { + t.Parallel() + + t.Run("subscribe with pre-existing messages", func(t *testing.T) { + t.Parallel() + + var ( + count = 100 + view = &View{ + Height: 1, + Round: 0, + } + ) + + // Create the collector + c := NewCollector[PrevoteMessage]() + + generatedMessages := generateMessages( + t, + count, + view, + MessageType_PREVOTE, + ) + + expectedMessages := make([]*PrevoteMessage, 0, count) + + for _, message := range generatedMessages { + prevote, ok := message.Payload.(*Message_PrevoteMessage) + require.True(t, ok) + + c.AddMessage(prevote.PrevoteMessage.View, prevote.PrevoteMessage.From, prevote.PrevoteMessage) + + expectedMessages = append(expectedMessages, prevote.PrevoteMessage) + } + + // Create a subscription + notifyCh, unsubscribeFn := c.Subscribe() + defer unsubscribeFn() + + var messages []*PrevoteMessage + + select { + case callback := <-notifyCh: + messages = callback() + case <-time.After(5 * time.Second): + } + + // Sort the messages for the test + sort.SliceStable(expectedMessages, func(i, j int) bool { + return string(expectedMessages[i].From) < string(expectedMessages[j].From) + }) + + // Sort the messages for the test + sort.SliceStable(messages, func(i, j int) bool { + return string(messages[i].From) < string(messages[j].From) + }) + + // Make sure the messages match + assert.Equal(t, expectedMessages, messages) + }) + + t.Run("subscribe with no pre-existing messages", func(t *testing.T) { + t.Parallel() + + var ( + count = 100 + view = &View{ + Height: 1, + Round: 0, + } + ) + + // Create the collector + c := NewCollector[PrevoteMessage]() + + generatedMessages := generateMessages( + t, + count, + view, + MessageType_PREVOTE, + ) + + expectedMessages := make([]*PrevoteMessage, 0, count) + + // Create a subscription + notifyCh, unsubscribeFn := c.Subscribe() + defer unsubscribeFn() + + for _, message := range generatedMessages { + prevote, ok := message.Payload.(*Message_PrevoteMessage) + require.True(t, ok) + + c.AddMessage(prevote.PrevoteMessage.View, prevote.PrevoteMessage.From, prevote.PrevoteMessage) + + expectedMessages = append(expectedMessages, prevote.PrevoteMessage) + } + + var ( + messages []*PrevoteMessage + + wg sync.WaitGroup + ) + + wg.Add(1) + + go func() { + defer wg.Done() + + select { + case callback := <-notifyCh: + messages = callback() + case <-time.After(5 * time.Second): + } + }() + + wg.Wait() + + // Sort the messages for the test + sort.SliceStable(expectedMessages, func(i, j int) bool { + return string(expectedMessages[i].From) < string(expectedMessages[j].From) + }) + + // Sort the messages for the test + sort.SliceStable(messages, func(i, j int) bool { + return string(messages[i].From) < string(messages[j].From) + }) + + // Make sure the messages match + assert.Equal(t, expectedMessages, messages) + }) +} diff --git a/messages/proto/messages.pb.go b/messages/messages.pb.go similarity index 69% rename from messages/proto/messages.pb.go rename to messages/messages.pb.go index 9d18d54322a..479b1532dd0 100644 --- a/messages/proto/messages.pb.go +++ b/messages/messages.pb.go @@ -1,10 +1,10 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.25.2 +// protoc v4.25.3 // source: messages/proto/messages.proto -package proto +package messages import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -138,17 +138,16 @@ type Message struct { // type is the type of consensus message Type MessageType `protobuf:"varint,1,opt,name=type,proto3,enum=MessageType" json:"type,omitempty"` - // view is the current view for the message - // (the view in which the message was sent) - View *View `protobuf:"bytes,2,opt,name=view,proto3" json:"view,omitempty"` + // the message signature of the sender, if any + Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` // data is the underlying wrapped message // - // Types that are assignable to Data: + // Types that are assignable to Payload: // // *Message_ProposalMessage // *Message_PrevoteMessage // *Message_PrecommitMessage - Data isMessage_Data `protobuf_oneof:"data"` + Payload isMessage_Payload `protobuf_oneof:"payload"` } func (x *Message) Reset() { @@ -190,43 +189,43 @@ func (x *Message) GetType() MessageType { return MessageType_PROPOSAL } -func (x *Message) GetView() *View { +func (x *Message) GetSignature() []byte { if x != nil { - return x.View + return x.Signature } return nil } -func (m *Message) GetData() isMessage_Data { +func (m *Message) GetPayload() isMessage_Payload { if m != nil { - return m.Data + return m.Payload } return nil } func (x *Message) GetProposalMessage() *ProposalMessage { - if x, ok := x.GetData().(*Message_ProposalMessage); ok { + if x, ok := x.GetPayload().(*Message_ProposalMessage); ok { return x.ProposalMessage } return nil } func (x *Message) GetPrevoteMessage() *PrevoteMessage { - if x, ok := x.GetData().(*Message_PrevoteMessage); ok { + if x, ok := x.GetPayload().(*Message_PrevoteMessage); ok { return x.PrevoteMessage } return nil } func (x *Message) GetPrecommitMessage() *PrecommitMessage { - if x, ok := x.GetData().(*Message_PrecommitMessage); ok { + if x, ok := x.GetPayload().(*Message_PrecommitMessage); ok { return x.PrecommitMessage } return nil } -type isMessage_Data interface { - isMessage_Data() +type isMessage_Payload interface { + isMessage_Payload() } type Message_ProposalMessage struct { @@ -241,11 +240,11 @@ type Message_PrecommitMessage struct { PrecommitMessage *PrecommitMessage `protobuf:"bytes,5,opt,name=precommitMessage,proto3,oneof"` } -func (*Message_ProposalMessage) isMessage_Data() {} +func (*Message_ProposalMessage) isMessage_Payload() {} -func (*Message_PrevoteMessage) isMessage_Data() {} +func (*Message_PrevoteMessage) isMessage_Payload() {} -func (*Message_PrecommitMessage) isMessage_Data() {} +func (*Message_PrecommitMessage) isMessage_Payload() {} // ProposalMessage is the message containing // the consensus proposal for the view @@ -255,6 +254,11 @@ type ProposalMessage struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + // view is the current view for the message + // (the view in which the message was sent) + View *View `protobuf:"bytes,1,opt,name=view,proto3" json:"view,omitempty"` + // from is the message sender (unique identifier) + From []byte `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` // proposal is the actual consensus proposal Proposal []byte `protobuf:"bytes,3,opt,name=proposal,proto3" json:"proposal,omitempty"` // proposalRound is the round associated with the @@ -296,6 +300,20 @@ func (*ProposalMessage) Descriptor() ([]byte, []int) { return file_messages_proto_messages_proto_rawDescGZIP(), []int{2} } +func (x *ProposalMessage) GetView() *View { + if x != nil { + return x.View + } + return nil +} + +func (x *ProposalMessage) GetFrom() []byte { + if x != nil { + return x.From + } + return nil +} + func (x *ProposalMessage) GetProposal() []byte { if x != nil { return x.Proposal @@ -322,10 +340,15 @@ type PrevoteMessage struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + // view is the current view for the message + // (the view in which the message was sent) + View *View `protobuf:"bytes,1,opt,name=view,proto3" json:"view,omitempty"` + // from is the message sender (unique identifier) + From []byte `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` // identifier is the unique identifier for // the proposal associated with this // prevote message (ex. proposal hash) - Identifier []byte `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"` + Identifier []byte `protobuf:"bytes,3,opt,name=identifier,proto3" json:"identifier,omitempty"` } func (x *PrevoteMessage) Reset() { @@ -360,6 +383,20 @@ func (*PrevoteMessage) Descriptor() ([]byte, []int) { return file_messages_proto_messages_proto_rawDescGZIP(), []int{3} } +func (x *PrevoteMessage) GetView() *View { + if x != nil { + return x.View + } + return nil +} + +func (x *PrevoteMessage) GetFrom() []byte { + if x != nil { + return x.From + } + return nil +} + func (x *PrevoteMessage) GetIdentifier() []byte { if x != nil { return x.Identifier @@ -378,10 +415,15 @@ type PrecommitMessage struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + // view is the current view for the message + // (the view in which the message was sent) + View *View `protobuf:"bytes,1,opt,name=view,proto3" json:"view,omitempty"` + // from is the message sender (unique identifier) + From []byte `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` // identifier is the unique identifier for // the proposal associated with this // precommit message (ex. proposal hash) - Identifier []byte `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"` + Identifier []byte `protobuf:"bytes,3,opt,name=identifier,proto3" json:"identifier,omitempty"` } func (x *PrecommitMessage) Reset() { @@ -416,6 +458,20 @@ func (*PrecommitMessage) Descriptor() ([]byte, []int) { return file_messages_proto_messages_proto_rawDescGZIP(), []int{4} } +func (x *PrecommitMessage) GetView() *View { + if x != nil { + return x.View + } + return nil +} + +func (x *PrecommitMessage) GetFrom() []byte { + if x != nil { + return x.From + } + return nil +} + func (x *PrecommitMessage) GetIdentifier() []byte { if x != nil { return x.Identifier @@ -431,40 +487,49 @@ var file_messages_proto_messages_proto_rawDesc = []byte{ 0x34, 0x0a, 0x04, 0x56, 0x69, 0x65, 0x77, 0x12, 0x16, 0x0a, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, - 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x22, 0x88, 0x02, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x22, 0x8e, 0x02, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x20, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x12, 0x19, 0x0a, 0x04, 0x76, 0x69, 0x65, 0x77, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x05, 0x2e, 0x56, 0x69, 0x65, 0x77, 0x52, 0x04, 0x76, 0x69, 0x65, 0x77, 0x12, 0x3c, - 0x0a, 0x0f, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, - 0x61, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0f, 0x70, 0x72, 0x6f, - 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x39, 0x0a, 0x0e, - 0x70, 0x72, 0x65, 0x76, 0x6f, 0x74, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x50, 0x72, 0x65, 0x76, 0x6f, 0x74, 0x65, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x72, 0x65, 0x76, 0x6f, 0x74, 0x65, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3f, 0x0a, 0x10, 0x70, 0x72, 0x65, 0x63, 0x6f, - 0x6d, 0x6d, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x72, 0x65, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x10, 0x70, 0x72, 0x65, 0x63, 0x6f, 0x6d, 0x6d, 0x69, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x22, 0x53, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x12, - 0x24, 0x0a, 0x0d, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x52, 0x6f, 0x75, 0x6e, 0x64, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, - 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x22, 0x30, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x76, 0x6f, 0x74, 0x65, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x64, 0x65, 0x6e, 0x74, - 0x69, 0x66, 0x69, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x69, 0x64, 0x65, - 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x22, 0x32, 0x0a, 0x10, 0x50, 0x72, 0x65, 0x63, 0x6f, - 0x6d, 0x6d, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x69, - 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0a, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x2a, 0x37, 0x0a, 0x0b, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x52, - 0x4f, 0x50, 0x4f, 0x53, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x52, 0x45, 0x56, - 0x4f, 0x54, 0x45, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x50, 0x52, 0x45, 0x43, 0x4f, 0x4d, 0x4d, - 0x49, 0x54, 0x10, 0x02, 0x42, 0x11, 0x5a, 0x0f, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x12, 0x3c, 0x0a, 0x0f, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x50, 0x72, 0x6f, + 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0f, + 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, + 0x39, 0x0a, 0x0e, 0x70, 0x72, 0x65, 0x76, 0x6f, 0x74, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x50, 0x72, 0x65, 0x76, 0x6f, 0x74, + 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x72, 0x65, 0x76, + 0x6f, 0x74, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3f, 0x0a, 0x10, 0x70, 0x72, + 0x65, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x50, 0x72, 0x65, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x10, 0x70, 0x72, 0x65, 0x63, 0x6f, + 0x6d, 0x6d, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x82, 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x70, 0x6f, + 0x73, 0x61, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x04, 0x76, 0x69, + 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x05, 0x2e, 0x56, 0x69, 0x65, 0x77, 0x52, + 0x04, 0x76, 0x69, 0x65, 0x77, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, + 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x70, 0x72, 0x6f, + 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x12, 0x24, 0x0a, 0x0d, 0x70, 0x72, 0x6f, 0x70, 0x6f, 0x73, 0x61, + 0x6c, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x70, 0x72, + 0x6f, 0x70, 0x6f, 0x73, 0x61, 0x6c, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x22, 0x5f, 0x0a, 0x0e, 0x50, + 0x72, 0x65, 0x76, 0x6f, 0x74, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, + 0x04, 0x76, 0x69, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x05, 0x2e, 0x56, 0x69, + 0x65, 0x77, 0x52, 0x04, 0x76, 0x69, 0x65, 0x77, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, + 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x0a, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x22, 0x61, 0x0a, 0x10, + 0x50, 0x72, 0x65, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x19, 0x0a, 0x04, 0x76, 0x69, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x05, + 0x2e, 0x56, 0x69, 0x65, 0x77, 0x52, 0x04, 0x76, 0x69, 0x65, 0x77, 0x12, 0x12, 0x0a, 0x04, 0x66, + 0x72, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, + 0x1e, 0x0a, 0x0a, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x2a, + 0x37, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, + 0x0a, 0x08, 0x50, 0x52, 0x4f, 0x50, 0x4f, 0x53, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, + 0x50, 0x52, 0x45, 0x56, 0x4f, 0x54, 0x45, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x50, 0x52, 0x45, + 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x02, 0x42, 0x0b, 0x5a, 0x09, 0x2f, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -491,15 +556,17 @@ var file_messages_proto_messages_proto_goTypes = []interface{}{ } var file_messages_proto_messages_proto_depIdxs = []int32{ 0, // 0: Message.type:type_name -> MessageType - 1, // 1: Message.view:type_name -> View - 3, // 2: Message.proposalMessage:type_name -> ProposalMessage - 4, // 3: Message.prevoteMessage:type_name -> PrevoteMessage - 5, // 4: Message.precommitMessage:type_name -> PrecommitMessage - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 3, // 1: Message.proposalMessage:type_name -> ProposalMessage + 4, // 2: Message.prevoteMessage:type_name -> PrevoteMessage + 5, // 3: Message.precommitMessage:type_name -> PrecommitMessage + 1, // 4: ProposalMessage.view:type_name -> View + 1, // 5: PrevoteMessage.view:type_name -> View + 1, // 6: PrecommitMessage.view:type_name -> View + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_messages_proto_messages_proto_init() } diff --git a/messages/proto/messages.proto b/messages/proto/messages.proto index 58786b61af8..d2de47182a6 100644 --- a/messages/proto/messages.proto +++ b/messages/proto/messages.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -option go_package = "/messages/proto"; +option go_package = "/messages"; // MessageType defines the types of messages // that are related to the consensus process @@ -25,12 +25,11 @@ message Message { // type is the type of consensus message MessageType type = 1; - // view is the current view for the message - // (the view in which the message was sent) - View view = 2; + // the message signature of the sender, if any + bytes signature = 2; // data is the underlying wrapped message - oneof data { + oneof payload { ProposalMessage proposalMessage = 3; PrevoteMessage prevoteMessage = 4; PrecommitMessage precommitMessage = 5; @@ -41,6 +40,13 @@ message Message { // the consensus proposal for the view // message ProposalMessage { + // view is the current view for the message + // (the view in which the message was sent) + View view = 1; + + // from is the message sender (unique identifier) + bytes from = 2; + // proposal is the actual consensus proposal bytes proposal = 3; @@ -59,10 +65,17 @@ message ProposalMessage { // for which this prevote is meant for (ex. proposal hash) // message PrevoteMessage { + // view is the current view for the message + // (the view in which the message was sent) + View view = 1; + + // from is the message sender (unique identifier) + bytes from = 2; + // identifier is the unique identifier for // the proposal associated with this // prevote message (ex. proposal hash) - bytes identifier = 1; + bytes identifier = 3; } // PrecommitMessage is the message @@ -72,8 +85,15 @@ message PrevoteMessage { // for which this precommit is meant for (ex. proposal hash) // message PrecommitMessage { + // view is the current view for the message + // (the view in which the message was sent) + View view = 1; + + // from is the message sender (unique identifier) + bytes from = 2; + // identifier is the unique identifier for // the proposal associated with this // precommit message (ex. proposal hash) - bytes identifier = 1; + bytes identifier = 3; } \ No newline at end of file diff --git a/messages/subscription.go b/messages/subscription.go new file mode 100644 index 00000000000..d537566d820 --- /dev/null +++ b/messages/subscription.go @@ -0,0 +1,57 @@ +package messages + +import "github.com/rs/xid" + +type ( + // MsgCallback is the callback that returns all given messages + MsgCallback[T msgType] func() []*T + + // subscriptions is the subscription store, + // maps subscription id -> notification channel. + // Usage of this type is NOT thread safe + subscriptions[T msgType] map[string]chan MsgCallback[T] +) + +// add adds a new subscription to the subscription map. +// Returns the subscription ID, and update channel +func (s *subscriptions[T]) add() (string, chan MsgCallback[T]) { + var ( + id = xid.New().String() + ch = make(chan MsgCallback[T], 1) + ) + + (*s)[id] = ch + + return id, ch +} + +// remove removes the given subscription +func (s *subscriptions[T]) remove(id string) { + if ch := (*s)[id]; ch != nil { + // Close the notification channel + close(ch) + } + + // Delete the subscription + delete(*s, id) +} + +// notify notifies all subscription listeners +func (s *subscriptions[T]) notify(callback MsgCallback[T]) { + // Notify the listeners + for _, ch := range *s { + notifySubscription(ch, callback) + } +} + +// notifySubscription alerts the notification channel +// about a callback. This function is pure syntactic sugar +func notifySubscription[T msgType]( + ch chan MsgCallback[T], + callback MsgCallback[T], +) { + select { + case ch <- callback: + default: + } +}