Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upstream more changes from v2 #20387

Merged
merged 11 commits into from
May 17, 2024
Merged

chore: upstream more changes from v2 #20387

merged 11 commits into from
May 17, 2024

Conversation

tac0turtle
Copy link
Member

@tac0turtle tac0turtle commented May 14, 2024

Description

this pr upstreams more changes from server v2


Author Checklist

All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.

I have...

  • included the correct type prefix in the PR title
  • confirmed ! in the type prefix if API or client breaking change
  • targeted the correct branch (see PR Targeting)
  • provided a link to the relevant issue or specification
  • reviewed "Files changed" and left comments if necessary
  • included the necessary unit and integration tests
  • added a changelog entry to CHANGELOG.md
  • updated the relevant documentation or specification, including comments for documenting Go code
  • confirmed all CI checks have passed

Reviewers Checklist

All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.

I have...

  • confirmed the correct type prefix in the PR title
  • confirmed all author checklist items have been addressed
  • reviewed state machine logic, API design and naming, documentation is accurate, tests and test coverage

Summary by CodeRabbit

  • New Features

    • Added message types and service for streaming functionalities related to block delivery, state changes, and transaction execution results.
    • Introduced a DefaultConfig function for gRPC server configuration.
    • Implemented gRPC reflection for gogoproto consumers.
    • Added a search bar and search functionality to the app.
  • Documentation

    • Added documentation for the State Streaming Plugin (gRPC) in the Server/v2 package.
  • Tests

    • Added test cases for server functionalities and streaming plugin functionality.
  • Chores

    • Updated dependencies versions and hashes in the gomod2nix.toml file.

Copy link
Contributor

coderabbitai bot commented May 14, 2024

Walkthrough

Walkthrough

The updates involve significant enhancements across various server modules, focusing on gRPC server configurations, telemetry, streaming plugins, and CLI command management. New message types and services were introduced for streaming functionalities, while the gRPC server and gateway underwent configuration and functionality updates. The telemetry package was enriched with metrics management capabilities, and a new plugin system for state streaming was implemented. These changes aim to enhance server configurability, monitoring, and extensibility.

Changes

File(s) Change Summary
proto/cosmos/streaming/v1/grpc.proto Added new message types and ListenerService for block delivery and state changes.
server/v2/api/grpc/config.go
server/v2/api/grpc/gogoreflection/doc.go
server/v2/api/grpc/gogoreflection/fix_registration.go
server/v2/api/grpc/gogoreflection/fix_registration_test.go
Introduced gRPC configuration updates, gogoreflection package, and registration handling functions.
server/v2/api/grpc/gogoreflection/serverreflection.go Implemented server reflection service for gRPC.
server/v2/api/grpc/server.go Introduced GRPCServer struct and related functions for server setup and message encoding/decoding.
server/v2/api/grpcgateway/config.go
server/v2/api/grpcgateway/server.go
Added gRPC-gateway configuration and server implementation.
server/v2/api/telemetry/config.go
server/v2/api/telemetry/metrics.go
server/v2/api/telemetry/server.go
Enhanced telemetry package with metrics management and registration functions.
server/v2/commands.go Added CLI command management functions with error handling.
server/v2/config.go Defined server configuration structs.
server/v2/flags.go
server/v2/logger.go
Added constants for logging flags and logger creation function.
server/v2/server.go
server/v2/server_mock_test.go
server/v2/server_test.go
Introduced server module management, mock server, and test cases.
server/v2/streaming/README.md
server/v2/streaming/config.go
server/v2/streaming/context.go
Added streaming plugin system documentation and context interface.
server/v2/streaming/examples/file/.gitignore
server/v2/streaming/examples/file/README.md
server/v2/streaming/examples/file/file.go
Updated file plugin and introduced example file plugin functionality.
server/v2/streaming/examples/stdout/stdout.go Implemented ABCIListener for processing and outputting data.
server/v2/streaming/grpc.go
server/v2/streaming/interface.go
Introduced gRPC server and interface for streaming services.
server/v2/streaming/plugin.md Documented State Streaming Plugin (gRPC) details.
server/v2/streaming/streaming.go
server/v2/streaming/streaming_test.go
Added functions for managing streaming plugins and test cases.
server/v2/streaming/types.go
server/v2/streaming/utils.go
Defined Manager struct and utility functions for streaming.
server/v2/testdata/app.toml Added configurations for gRPC and mock server settings.

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

server/v2/streaming/examples/file/file.go Dismissed Show dismissed Hide dismissed
server/v2/commands.go Dismissed Show dismissed Hide dismissed
server/v2/api/grpc/gogoreflection/fix_registration.go Dismissed Show dismissed Hide dismissed
server/v2/api/grpc/gogoreflection/serverreflection.go Dismissed Show dismissed Hide dismissed
server/v2/api/grpc/gogoreflection/fix_registration.go Dismissed Show dismissed Hide dismissed
server/v2/api/grpc/gogoreflection/fix_registration.go Dismissed Show dismissed Hide dismissed
server/v2/api/grpc/gogoreflection/fix_registration.go Dismissed Show dismissed Hide dismissed
server/v2/api/grpc/gogoreflection/fix_registration.go Dismissed Show dismissed Hide dismissed
@tac0turtle tac0turtle marked this pull request as ready for review May 15, 2024 13:36
@tac0turtle tac0turtle requested a review from a team as a code owner May 15, 2024 13:36
@@ -0,0 +1,30 @@
# Cosmos-SDK Plugins

This package contains an extensible plugin system for the Cosmos-SDK. The plugin system leverages the [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin) system. This system is designed to work over RPC.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing space.

- This package contains an extensible plugin system for the Cosmos-SDK. The plugin system leverages the [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin) system. This system is designed to work over RPC. 
+ This package contains an extensible plugin system for the Cosmos-SDK. The plugin system leverages the [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin) system. This system is designed to work over RPC.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
This package contains an extensible plugin system for the Cosmos-SDK. The plugin system leverages the [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin) system. This system is designed to work over RPC.
This package contains an extensible plugin system for the Cosmos-SDK. The plugin system leverages the [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin) system. This system is designed to work over RPC.


## Pre requisites

For an overview of supported features by the `go-plugin` system, please see https://github.com/hashicorp/go-plugin. The `go-plugin` documentation is located [here](https://github.com/hashicorp/go-plugin/tree/master/docs). You can also directly visit any of the links below:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using bare URLs.

- For an overview of supported features by the `go-plugin` system, please see https://github.com/hashicorp/go-plugin. The `go-plugin` documentation is located [here](https://github.com/hashicorp/go-plugin/tree/master/docs). You can also directly visit any of the links below:
+ For an overview of supported features by the `go-plugin` system, please see the [go-plugin GitHub page](https://github.com/hashicorp/go-plugin). The `go-plugin` documentation is located [here](https://github.com/hashicorp/go-plugin/tree/master/docs). You can also directly visit any of the links below:

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
For an overview of supported features by the `go-plugin` system, please see https://github.com/hashicorp/go-plugin. The `go-plugin` documentation is located [here](https://github.com/hashicorp/go-plugin/tree/master/docs). You can also directly visit any of the links below:
For an overview of supported features by the `go-plugin` system, please see the [go-plugin GitHub page](https://github.com/hashicorp/go-plugin). The `go-plugin` documentation is located [here](https://github.com/hashicorp/go-plugin/tree/master/docs). You can also directly visit any of the links below:

Comment on lines +5 to +17
func DefaultConfig() *Config {
return &Config{
Enable: true,
// DefaultGRPCAddress defines the default address to bind the gRPC server to.
Address: "localhost:9090",
// DefaultGRPCMaxRecvMsgSize defines the default gRPC max message size in
// bytes the server can receive.
MaxRecvMsgSize: 1024 * 1024 * 10,
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
MaxSendMsgSize: math.MaxInt32,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding documentation for the DefaultConfig function.

+ // DefaultConfig returns the default configuration for the gRPC server.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func DefaultConfig() *Config {
return &Config{
Enable: true,
// DefaultGRPCAddress defines the default address to bind the gRPC server to.
Address: "localhost:9090",
// DefaultGRPCMaxRecvMsgSize defines the default gRPC max message size in
// bytes the server can receive.
MaxRecvMsgSize: 1024 * 1024 * 10,
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
MaxSendMsgSize: math.MaxInt32,
}
}
// DefaultConfig returns the default configuration for the gRPC server.
func DefaultConfig() *Config {
return &Config{
Enable: true,
// DefaultGRPCAddress defines the default address to bind the gRPC server to.
Address: "localhost:9090",
// DefaultGRPCMaxRecvMsgSize defines the default gRPC max message size in
// bytes the server can receive.
MaxRecvMsgSize: 1024 * 1024 * 10,
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
MaxSendMsgSize: math.MaxInt32,
}
}

Comment on lines +19 to +34
// GRPCConfig defines configuration for the gRPC server.
type Config struct {
// Enable defines if the gRPC server should be enabled.
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC server should be enabled."`

// Address defines the API server to listen on
Address string `mapstructure:"address" toml:"address" comment:"Address defines the gRPC server address to bind to."`

// MaxRecvMsgSize defines the max message size in bytes the server can receive.
// The default value is 10MB.
MaxRecvMsgSize int `mapstructure:"max-recv-msg-size" toml:"max-recv-msg-size" comment:"MaxRecvMsgSize defines the max message size in bytes the server can receive.\nThe default value is 10MB."`

// MaxSendMsgSize defines the max message size in bytes the server can send.
// The default value is math.MaxInt32.
MaxSendMsgSize int `mapstructure:"max-send-msg-size" toml:"max-send-msg-size" comment:"MaxSendMsgSize defines the max message size in bytes the server can send.\nThe default value is math.MaxInt32."`
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding documentation for the Config struct.

+ // Config defines the configuration for the gRPC server.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// GRPCConfig defines configuration for the gRPC server.
type Config struct {
// Enable defines if the gRPC server should be enabled.
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC server should be enabled."`
// Address defines the API server to listen on
Address string `mapstructure:"address" toml:"address" comment:"Address defines the gRPC server address to bind to."`
// MaxRecvMsgSize defines the max message size in bytes the server can receive.
// The default value is 10MB.
MaxRecvMsgSize int `mapstructure:"max-recv-msg-size" toml:"max-recv-msg-size" comment:"MaxRecvMsgSize defines the max message size in bytes the server can receive.\nThe default value is 10MB."`
// MaxSendMsgSize defines the max message size in bytes the server can send.
// The default value is math.MaxInt32.
MaxSendMsgSize int `mapstructure:"max-send-msg-size" toml:"max-send-msg-size" comment:"MaxSendMsgSize defines the max message size in bytes the server can send.\nThe default value is math.MaxInt32."`
}

// Config defines the configuration for the gRPC server.
type Config struct {
// Enable defines if the gRPC server should be enabled.
Enable bool mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC server should be enabled."

// Address defines the API server to listen on
Address string `mapstructure:"address" toml:"address" comment:"Address defines the gRPC server address to bind to."`

// MaxRecvMsgSize defines the max message size in bytes the server can receive.
// The default value is 10MB.
MaxRecvMsgSize int `mapstructure:"max-recv-msg-size" toml:"max-recv-msg-size" comment:"MaxRecvMsgSize defines the max message size in bytes the server can receive.\nThe default value is 10MB."`

// MaxSendMsgSize defines the max message size in bytes the server can send.
// The default value is math.MaxInt32.
MaxSendMsgSize int `mapstructure:"max-send-msg-size" toml:"max-send-msg-size" comment:"MaxSendMsgSize defines the max message size in bytes the server can send.\nThe default value is math.MaxInt32."`

}


</details>
<!-- suggestion_end -->

<!-- This is an auto-generated comment by CodeRabbit -->

Comment on lines +28 to +30
func GetPluginEnvKey(name string) string {
return fmt.Sprintf("%s_%s", pluginEnvKeyPrefix, strings.ToUpper(name))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding documentation for the GetPluginEnvKey function.

+ // GetPluginEnvKey generates an environment key for a plugin.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func GetPluginEnvKey(name string) string {
return fmt.Sprintf("%s_%s", pluginEnvKeyPrefix, strings.ToUpper(name))
}
// GetPluginEnvKey generates an environment key for a plugin.
func GetPluginEnvKey(name string) string {
return fmt.Sprintf("%s_%s", pluginEnvKeyPrefix, strings.ToUpper(name))
}

Comment on lines +32 to +63
func NewStreamingPlugin(name, logLevel string) (interface{}, error) {
logger := hclog.New(&hclog.LoggerOptions{
Output: hclog.DefaultOutput,
Level: toHclogLevel(logLevel),
Name: fmt.Sprintf("plugin.%s", name),
})

// We're a host. Start by launching the streaming process.
env := os.Getenv(GetPluginEnvKey(name))
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: HandshakeMap[name],
Managed: true,
Plugins: PluginMap,
// For verifying the integrity of executables see SecureConfig documentation
// https://pkg.go.dev/github.com/hashicorp/go-plugin#SecureConfig
//#nosec G204 -- Required to load plugins
Cmd: exec.Command("sh", "-c", env),
Logger: logger,
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC,
},
})

// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
return nil, err
}

// Request streaming plugin
return rpcClient.Dispense(name)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding documentation for the NewStreamingPlugin function.

+ // NewStreamingPlugin creates a new streaming plugin with the specified name and log level.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func NewStreamingPlugin(name, logLevel string) (interface{}, error) {
logger := hclog.New(&hclog.LoggerOptions{
Output: hclog.DefaultOutput,
Level: toHclogLevel(logLevel),
Name: fmt.Sprintf("plugin.%s", name),
})
// We're a host. Start by launching the streaming process.
env := os.Getenv(GetPluginEnvKey(name))
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: HandshakeMap[name],
Managed: true,
Plugins: PluginMap,
// For verifying the integrity of executables see SecureConfig documentation
// https://pkg.go.dev/github.com/hashicorp/go-plugin#SecureConfig
//#nosec G204 -- Required to load plugins
Cmd: exec.Command("sh", "-c", env),
Logger: logger,
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC,
},
})
// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
return nil, err
}
// Request streaming plugin
return rpcClient.Dispense(name)
}
// NewStreamingPlugin creates a new streaming plugin with the specified name and log level.
func NewStreamingPlugin(name, logLevel string) (interface{}, error) {
logger := hclog.New(&hclog.LoggerOptions{
Output: hclog.DefaultOutput,
Level: toHclogLevel(logLevel),
Name: fmt.Sprintf("plugin.%s", name),
})
// We're a host. Start by launching the streaming process.
env := os.Getenv(GetPluginEnvKey(name))
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: HandshakeMap[name],
Managed: true,
Plugins: PluginMap,
// For verifying the integrity of executables see SecureConfig documentation
// https://pkg.go.dev/github.com/hashicorp/go-plugin#SecureConfig
//#nosec G204 -- Required to load plugins
Cmd: exec.Command("sh", "-c", env),
Logger: logger,
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC,
},
})
// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
return nil, err
}
// Request streaming plugin
return rpcClient.Dispense(name)
}

Comment on lines +36 to +42
// TODO split this test into multiple tests
// test read config
// test write config
// test server configs
// test start empty
// test start config exists
// test stop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider splitting the TestServer function into multiple smaller tests to improve readability and maintainability. Each test can focus on a specific aspect such as reading config, writing config, server configurations, starting empty, starting with existing config, and stopping the server.

Comment on lines +43 to +115
func TestServer(t *testing.T) {
currentDir, err := os.Getwd()
if err != nil {
t.Log(err)
t.Fail()
}
configPath := filepath.Join(currentDir, "testdata", "app.toml")

v, err := serverv2.ReadConfig(configPath)
if err != nil {
v = viper.New()
}

logger := log.NewLogger(os.Stdout)
grpcServer, err := grpc.New(logger, v, &mockInterfaceRegistry{}, &mockGRPCService{})
if err != nil {
t.Log(err)
t.Fail()
}

mockServer := &mockServer{name: "mock-server-1", ch: make(chan string, 100)}

server := serverv2.NewServer(
logger,
grpcServer,
mockServer,
)

serverCfgs := server.Configs()
if serverCfgs[grpcServer.Name()].(*grpc.Config).Address != grpc.DefaultConfig().Address {
t.Logf("config is not equal: %v", serverCfgs[grpcServer.Name()])
t.Fail()
}
if serverCfgs[mockServer.Name()].(*mockServerConfig).MockFieldOne != MockServerDefaultConfig().MockFieldOne {
t.Logf("config is not equal: %v", serverCfgs[mockServer.Name()])
t.Fail()
}

// write config
if err := server.WriteConfig(configPath); err != nil {
t.Log(err)
t.Fail()
}

v, err = serverv2.ReadConfig(configPath)
if err != nil {
t.Log(err) // config should be created by WriteConfig
t.FailNow()
}
if v.GetString(grpcServer.Name()+".address") != grpc.DefaultConfig().Address {
t.Logf("config is not equal: %v", v)
t.Fail()
}

// start empty
ctx := context.Background()
ctx = context.WithValue(ctx, serverv2.ServerContextKey, serverv2.Config{StartBlock: true})
ctx, cancelFn := context.WithCancel(ctx)
go func() {
// wait 5sec and cancel context
<-time.After(5 * time.Second)
cancelFn()

if err := server.Stop(ctx); err != nil {
t.Logf("failed to stop servers: %s", err)
t.Fail()
}
}()

if err := server.Start(ctx); err != nil {
t.Log(err)
t.Fail()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve the error messages and assertions to provide more context in case of test failures. This will help in debugging and understanding the cause of the failure.

Example:

- t.Log(err)
+ t.Logf("failed to get current directory: %v", err)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func TestServer(t *testing.T) {
currentDir, err := os.Getwd()
if err != nil {
t.Log(err)
t.Fail()
}
configPath := filepath.Join(currentDir, "testdata", "app.toml")
v, err := serverv2.ReadConfig(configPath)
if err != nil {
v = viper.New()
}
logger := log.NewLogger(os.Stdout)
grpcServer, err := grpc.New(logger, v, &mockInterfaceRegistry{}, &mockGRPCService{})
if err != nil {
t.Log(err)
t.Fail()
}
mockServer := &mockServer{name: "mock-server-1", ch: make(chan string, 100)}
server := serverv2.NewServer(
logger,
grpcServer,
mockServer,
)
serverCfgs := server.Configs()
if serverCfgs[grpcServer.Name()].(*grpc.Config).Address != grpc.DefaultConfig().Address {
t.Logf("config is not equal: %v", serverCfgs[grpcServer.Name()])
t.Fail()
}
if serverCfgs[mockServer.Name()].(*mockServerConfig).MockFieldOne != MockServerDefaultConfig().MockFieldOne {
t.Logf("config is not equal: %v", serverCfgs[mockServer.Name()])
t.Fail()
}
// write config
if err := server.WriteConfig(configPath); err != nil {
t.Log(err)
t.Fail()
}
v, err = serverv2.ReadConfig(configPath)
if err != nil {
t.Log(err) // config should be created by WriteConfig
t.FailNow()
}
if v.GetString(grpcServer.Name()+".address") != grpc.DefaultConfig().Address {
t.Logf("config is not equal: %v", v)
t.Fail()
}
// start empty
ctx := context.Background()
ctx = context.WithValue(ctx, serverv2.ServerContextKey, serverv2.Config{StartBlock: true})
ctx, cancelFn := context.WithCancel(ctx)
go func() {
// wait 5sec and cancel context
<-time.After(5 * time.Second)
cancelFn()
if err := server.Stop(ctx); err != nil {
t.Logf("failed to stop servers: %s", err)
t.Fail()
}
}()
if err := server.Start(ctx); err != nil {
t.Log(err)
t.Fail()
}
func TestServer(t *testing.T) {
currentDir, err := os.Getwd()
if err != nil {
t.Logf("failed to get current directory: %v", err)
t.Fail()
}
configPath := filepath.Join(currentDir, "testdata", "app.toml")
v, err := serverv2.ReadConfig(configPath)
if err != nil {
v = viper.New()
}
logger := log.NewLogger(os.Stdout)
grpcServer, err := grpc.New(logger, v, &mockInterfaceRegistry{}, &mockGRPCService{})
if err != nil {
t.Logf("failed to create gRPC server: %v", err)
t.Fail()
}
mockServer := &mockServer{name: "mock-server-1", ch: make(chan string, 100)}
server := serverv2.NewServer(
logger,
grpcServer,
mockServer,
)
serverCfgs := server.Configs()
if serverCfgs[grpcServer.Name()].(*grpc.Config).Address != grpc.DefaultConfig().Address {
t.Logf("gRPC server config is not equal: %v", serverCfgs[grpcServer.Name()])
t.Fail()
}
if serverCfgs[mockServer.Name()].(*mockServerConfig).MockFieldOne != MockServerDefaultConfig().MockFieldOne {
t.Logf("mock server config is not equal: %v", serverCfgs[mockServer.Name()])
t.Fail()
}
// write config
if err := server.WriteConfig(configPath); err != nil {
t.Logf("failed to write config: %v", err)
t.Fail()
}
v, err = serverv2.ReadConfig(configPath)
if err != nil {
t.Logf("failed to read config after writing: %v", err) // config should be created by WriteConfig
t.FailNow()
}
if v.GetString(grpcServer.Name()+".address") != grpc.DefaultConfig().Address {
t.Logf("gRPC server address config is not equal: %v", v)
t.Fail()
}
// start empty
ctx := context.Background()
ctx = context.WithValue(ctx, serverv2.ServerContextKey, serverv2.Config{StartBlock: true})
ctx, cancelFn := context.WithCancel(ctx)
go func() {
// wait 5sec and cancel context
<-time.After(5 * time.Second)
cancelFn()
if err := server.Stop(ctx); err != nil {
t.Logf("failed to stop servers: %s", err)
t.Fail()
}
}()
if err := server.Start(ctx); err != nil {
t.Logf("failed to start server: %v", err)
t.Fail()
}

@@ -0,0 +1,212 @@
# State Streaming Plugin (gRPC)

<!-- TODO fix docs before final release -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the TODO comment before the final release.

The TODO comment indicates that the documentation needs fixing before the final release. Ensure all necessary updates are made and remove this comment.


## Implementation

In this section we describe the implementation of the `ABCIListener` interface as a gRPC service.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comma after "implementation".

- In this section we describe the implementation of the `ABCIListener` interface as a gRPC service.
+ In this section, we describe the implementation of the `ABCIListener` interface as a gRPC service.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
In this section we describe the implementation of the `ABCIListener` interface as a gRPC service.
In this section, we describe the implementation of the `ABCIListener` interface as a gRPC service.

make proto-gen
```

For other languages you'll need to [download](https://github.com/cosmos/cosmos-sdk/blob/main/third_party/proto/README.md)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comma after "you'll need to".

- For other languages you'll need to [download](https://github.com/cosmos/cosmos-sdk/blob/main/third_party/proto/README.md)
+ For other languages, you'll need to [download](https://github.com/cosmos/cosmos-sdk/blob/main/third_party/proto/README.md)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
For other languages you'll need to [download](https://github.com/cosmos/cosmos-sdk/blob/main/third_party/proto/README.md)
For other languages, you'll need to [download](https://github.com/cosmos/cosmos-sdk/blob/main/third_party/proto/README.md)

```

For other languages you'll need to [download](https://github.com/cosmos/cosmos-sdk/blob/main/third_party/proto/README.md)
the CosmosSDK protos into your project and compile. For language specific compilation instructions visit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "language-specific" with a hyphen.

- For language specific compilation instructions visit
+ For language-specific compilation instructions visit

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
the CosmosSDK protos into your project and compile. For language specific compilation instructions visit
the CosmosSDK protos into your project and compile. For language-specific compilation instructions visit


### gRPC Client and Server

Implementing the ABCIListener gRPC client and server is a simple and straight forward process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "straightforward" as one word.

- Implementing the ABCIListener gRPC client and server is a simple and straight forward process.
+ Implementing the ABCIListener gRPC client and server is a simple and straightforward process.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
Implementing the ABCIListener gRPC client and server is a simple and straight forward process.
Implementing the ABCIListener gRPC client and server is a simple and straightforward process.

Comment on lines +65 to +80
func toHclogLevel(s string) hclog.Level {
switch s {
case "trace":
return hclog.Trace
case "debug":
return hclog.Debug
case "info":
return hclog.Info
case "warn":
return hclog.Warn
case "error":
return hclog.Error
default:
return hclog.DefaultLevel
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding documentation for the toHclogLevel function.

+ // toHclogLevel converts a string log level to an hclog.Level.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func toHclogLevel(s string) hclog.Level {
switch s {
case "trace":
return hclog.Trace
case "debug":
return hclog.Debug
case "info":
return hclog.Info
case "warn":
return hclog.Warn
case "error":
return hclog.Error
default:
return hclog.DefaultLevel
}
}
// toHclogLevel converts a string log level to an hclog.Level.
func toHclogLevel(s string) hclog.Level {
switch s {
case "trace":
return hclog.Trace
case "debug":
return hclog.Debug
case "info":
return hclog.Info
case "warn":
return hclog.Warn
case "error":
return hclog.Error
default:
return hclog.DefaultLevel
}
}


Implementing the ABCIListener gRPC client and server is a simple and straight forward process.

To create the client and server we create a `ListenerGRPCPlugin` struct that implements the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comma after "server".

- To create the client and server we create a `ListenerGRPCPlugin` struct
+ To create the client and server, we create a `ListenerGRPCPlugin` struct

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
To create the client and server we create a `ListenerGRPCPlugin` struct that implements the
To create the client and server, we create a `ListenerGRPCPlugin` struct that implements the

Comment on lines +79 to +101
...
}

func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// process data
return nil
}

func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
// process data
return nil
}

func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: v1.Handshake,
Plugins: map[string]plugin.Plugin{
"abci": &ABCIListenerGRPCPlugin{Impl: &MyPlugin{}},
},

// A non-nil value here enables gRPC serving for this streaming...
GRPCServer: plugin.DefaultGRPCServer,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace hard tabs with spaces for consistent indentation.

-	type MyPlugin struct {
-	...
-}
+ type MyPlugin struct {
+ ...
+ }

- func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
-	// process data
-	return nil
- }

- func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
-    // process data
-    return nil
- }

- func main() {
-	plugin.Serve(&plugin.ServeConfig{
-		HandshakeConfig: v1.Handshake,
-		Plugins: map[string]plugin.Plugin{
-			"abci": &ABCIListenerGRPCPlugin{Impl: &MyPlugin{}},
-		},
-		// A non-nil value here enables gRPC serving for this streaming...
-		GRPCServer: plugin.DefaultGRPCServer,
-	})
- }
+ func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
+   // process data
+   return nil
+ }

+ func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
+   // process data
+   return nil
+ }

+ func main() {
+   plugin.Serve(&plugin.ServeConfig{
+     HandshakeConfig: v1.Handshake,
+     Plugins: map[string]plugin.Plugin{
+       "abci": &ABCIListenerGRPCPlugin{Impl: &MyPlugin{}},
+     },
+     // A non-nil value here enables gRPC serving for this streaming...
+     GRPCServer: plugin.DefaultGRPCServer,
+   })
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
...
}
func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// process data
return nil
}
func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
// process data
return nil
}
func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: v1.Handshake,
Plugins: map[string]plugin.Plugin{
"abci": &ABCIListenerGRPCPlugin{Impl: &MyPlugin{}},
},
// A non-nil value here enables gRPC serving for this streaming...
GRPCServer: plugin.DefaultGRPCServer,
})
type MyPlugin struct {
...
}
func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// process data
return nil
}
func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
// process data
return nil
}
func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: v1.Handshake,
Plugins: map[string]plugin.Plugin{
"abci": &ABCIListenerGRPCPlugin{Impl: &MyPlugin{}},
},
// A non-nil value here enables gRPC serving for this streaming...
GRPCServer: plugin.DefaultGRPCServer,
})
}

Comment on lines +18 to +19
// ListenCommit updates the steaming service with the latest Commit messages and state changes
ListenStateChanges(ctx context.Context, changeSet []*StoreKVPair) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the typo in the comment.

- // ListenCommit updates the steaming service with the latest Commit messages and state changes
+ // ListenStateChanges updates the streaming service with the latest Commit messages and state changes

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// ListenCommit updates the steaming service with the latest Commit messages and state changes
ListenStateChanges(ctx context.Context, changeSet []*StoreKVPair) error
// ListenStateChanges updates the streaming service with the latest Commit messages and state changes
ListenStateChanges(ctx context.Context, changeSet []*StoreKVPair) error

}

func (a *FilePlugin) ListenStateChanges(ctx context.Context, changeSet []*streaming.StoreKVPair) error {
fmt.Printf("listen-commit: block_height=%d data=%v", a.BlockHeight, changeSet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing or conditionally compiling the print statement to avoid unnecessary console output in production.

- fmt.Printf("listen-commit: block_height=%d data=%v", a.BlockHeight, changeSet)
+ // fmt.Printf("listen-commit: block_height=%d data=%v", a.BlockHeight, changeSet)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
fmt.Printf("listen-commit: block_height=%d data=%v", a.BlockHeight, changeSet)
// fmt.Printf("listen-commit: block_height=%d data=%v", a.BlockHeight, changeSet)

Comment on lines +22 to +24
// Handshake is a common handshake that is shared by streaming and host.
// This prevents users from executing bad plugins or executing a plugin
// directory. It is a UX feature, not a security feature.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve the comment for clarity.

- // This prevents users from executing bad plugins or executing a plugin
+ // This prevents users from executing bad plugins or executing a plugin from a directory.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// Handshake is a common handshake that is shared by streaming and host.
// This prevents users from executing bad plugins or executing a plugin
// directory. It is a UX feature, not a security feature.
// Handshake is a common handshake that is shared by streaming and host.
// This prevents users from executing bad plugins or executing a plugin from a directory.
// It is a UX feature, not a security feature.

Comment on lines +33 to +53
func getExtension(extID int32, m proto.Message) *gogoproto.ExtensionDesc {
// check first in gogoproto registry
for id, desc := range gogoproto.RegisteredExtensions(m) {
if id == extID {
return desc
}
}

// check into proto registry
for id, desc := range proto.RegisteredExtensions(m) { //nolint:staticcheck // keep for backward compatibility
if id == extID {
return &gogoproto.ExtensionDesc{
ExtendedType: desc.ExtendedType, //nolint:staticcheck // keep for backward compatibility
ExtensionType: desc.ExtensionType, //nolint:staticcheck // keep for backward compatibility
Field: desc.Field, //nolint:staticcheck // keep for backward compatibility
Name: desc.Name, //nolint:staticcheck // keep for backward compatibility
Tag: desc.Tag, //nolint:staticcheck // keep for backward compatibility
Filename: desc.Filename, //nolint:staticcheck // keep for backward compatibility
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the potential non-determinism introduced by iterating over maps.

+ // Note: Iteration over maps may introduce non-determinism.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func getExtension(extID int32, m proto.Message) *gogoproto.ExtensionDesc {
// check first in gogoproto registry
for id, desc := range gogoproto.RegisteredExtensions(m) {
if id == extID {
return desc
}
}
// check into proto registry
for id, desc := range proto.RegisteredExtensions(m) { //nolint:staticcheck // keep for backward compatibility
if id == extID {
return &gogoproto.ExtensionDesc{
ExtendedType: desc.ExtendedType, //nolint:staticcheck // keep for backward compatibility
ExtensionType: desc.ExtensionType, //nolint:staticcheck // keep for backward compatibility
Field: desc.Field, //nolint:staticcheck // keep for backward compatibility
Name: desc.Name, //nolint:staticcheck // keep for backward compatibility
Tag: desc.Tag, //nolint:staticcheck // keep for backward compatibility
Filename: desc.Filename, //nolint:staticcheck // keep for backward compatibility
}
}
}
func getExtension(extID int32, m proto.Message) *gogoproto.ExtensionDesc {
// check first in gogoproto registry
// Note: Iteration over maps may introduce non-determinism.
for id, desc := range gogoproto.RegisteredExtensions(m) {
if id == extID {
return desc
}
}
// check into proto registry
for id, desc := range proto.RegisteredExtensions(m) { //nolint:staticcheck // keep for backward compatibility
if id == extID {
return &gogoproto.ExtensionDesc{
ExtendedType: desc.ExtendedType, //nolint:staticcheck // keep for backward compatibility
ExtensionType: desc.ExtensionType, //nolint:staticcheck // keep for backward compatibility
Field: desc.Field, //nolint:staticcheck // keep for backward compatibility
Name: desc.Name, //nolint:staticcheck // keep for backward compatibility
Tag: desc.Tag, //nolint:staticcheck // keep for backward compatibility
Filename: desc.Filename, //nolint:staticcheck // keep for backward compatibility
}
}
}

Comment on lines +35 to +46
// Configs returns a viper instance of the config file
func ReadConfig(configPath string) (*viper.Viper, error) {
v := viper.New()
v.SetConfigFile(configPath)
v.SetConfigType("toml")
if err := v.ReadInConfig(); err != nil {
return nil, fmt.Errorf("failed to read config: %s: %w", configPath, err)
}
v.WatchConfig()

return v, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for the v.WatchConfig() call to handle potential issues with file watching.

Example:

if err := v.WatchConfig(); err != nil {
    return nil, fmt.Errorf("failed to watch config: %w", err)
}

Comment on lines +48 to +144
type Server struct {
logger log.Logger
modules []ServerModule
}

func NewServer(logger log.Logger, modules ...ServerModule) *Server {
return &Server{
logger: logger,
modules: modules,
}
}

func (s *Server) Name() string {
return "server"
}

// Start starts all modules concurrently.
func (s *Server) Start(ctx context.Context) error {
s.logger.Info("starting servers...")

g, ctx := errgroup.WithContext(ctx)
for _, mod := range s.modules {
mod := mod
g.Go(func() error {
return mod.Start(ctx)
})
}

if err := g.Wait(); err != nil {
return fmt.Errorf("failed to start servers: %w", err)
}

serverCfg := ctx.Value(ServerContextKey).(Config)
if serverCfg.StartBlock {
<-ctx.Done()
}

return nil
}

// Stop stops all modules concurrently.
func (s *Server) Stop(ctx context.Context) error {
s.logger.Info("stopping servers...")

g, ctx := errgroup.WithContext(ctx)
for _, mod := range s.modules {
mod := mod
g.Go(func() error {
return mod.Stop(ctx)
})
}

return g.Wait()
}

// CLICommands returns all CLI commands of all modules.
func (s *Server) CLICommands() CLIConfig {
commands := CLIConfig{}
for _, mod := range s.modules {
if climod, ok := mod.(HasCLICommands); ok {
commands.Commands = append(commands.Commands, climod.CLICommands().Commands...)
commands.Queries = append(commands.Queries, climod.CLICommands().Queries...)
commands.Txs = append(commands.Txs, climod.CLICommands().Txs...)
}
}

return commands
}

// Configs returns all configs of all server modules.
func (s *Server) Configs() map[string]any {
cfgs := make(map[string]any)
for _, mod := range s.modules {
if configmod, ok := mod.(HasConfig); ok {
cfg := configmod.Config()
cfgs[mod.Name()] = cfg
}
}

return cfgs
}

// WriteConfig writes the config to the given path.
// Note: it does not use viper.WriteConfigAs because we do not want to store flag values in the config.
func (s *Server) WriteConfig(configPath string) error {
cfgs := s.Configs()
b, err := toml.Marshal(cfgs)
if err != nil {
return fmt.Errorf("failed to marshal config: %w", err)
}

if err := os.WriteFile(configPath, b, 0o600); err != nil {
return fmt.Errorf("failed to write config: %w", err)
}

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more logging and error handling in the Start and Stop methods to provide better visibility into the server's lifecycle and potential issues.

Example:

if err := g.Wait(); err != nil {
    s.logger.Error("failed to start servers", "err", err)
    return fmt.Errorf("failed to start servers: %w", err)
}

Comment on lines +130 to +144
// WriteConfig writes the config to the given path.
// Note: it does not use viper.WriteConfigAs because we do not want to store flag values in the config.
func (s *Server) WriteConfig(configPath string) error {
cfgs := s.Configs()
b, err := toml.Marshal(cfgs)
if err != nil {
return fmt.Errorf("failed to marshal config: %w", err)
}

if err := os.WriteFile(configPath, b, 0o600); err != nil {
return fmt.Errorf("failed to write config: %w", err)
}

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more detailed error handling for the os.WriteFile call to handle potential issues with file writing.

Example:

if err := os.WriteFile(configPath, b, 0o600); err != nil {
    s.logger.Error("failed to write config", "path", configPath, "err", err)
    return fmt.Errorf("failed to write config: %w", err)
}

Comment on lines +31 to +80
func (s *PluginTestSuite) SetupTest() {
if runtime.GOOS != "linux" {
s.T().Skip("only run on linux")
}

path, err := os.Getwd()
if err != nil {
s.T().Fail()
}
s.workDir = path

pluginVersion := defaultPlugin
// to write data to files, replace stdout/stdout => file/file
pluginPath := fmt.Sprintf("%s/abci/examples/stdout/stdout", s.workDir)
if err := os.Setenv(GetPluginEnvKey(pluginVersion), pluginPath); err != nil {
s.T().Fail()
}

raw, err := NewStreamingPlugin(pluginVersion, "trace")
require.NoError(s.T(), err, "load", "streaming", "unexpected error")

abciListener, ok := raw.(Listener)
require.True(s.T(), ok, "should pass type check")

logger := log.NewNopLogger()
streamingService := Manager{
Listeners: []Listener{abciListener},
StopNodeOnErr: true,
}
s.loggerCtx = NewMockContext(1, logger, streamingService)

// test abci message types

s.deliverBlockrequest = ListenDeliverBlockRequest{
BlockHeight: s.loggerCtx.BlockHeight(),
Txs: [][]byte{{1, 2, 3, 4, 5, 6, 7, 8, 9}},
Events: []*Event{},
}
s.stateChangeRequest = ListenStateChangesRequest{}

key := []byte("mockStore")
key = append(key, 1, 2, 3)
// test store kv pair types
for range [2000]int{} {
s.changeSet = append(s.changeSet, &StoreKVPair{
Key: key,
Value: []byte{3, 2, 1},
})
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more error handling and logging in the SetupTest function to provide better visibility into the test setup process.

Example:

if err := os.Setenv(GetPluginEnvKey(pluginVersion), pluginPath); err != nil {
    s.T().Logf("failed to set environment variable: %v", err)
    s.T().Fail()
}

Comment on lines +82 to +147
func TestPluginTestSuite(t *testing.T) {
suite.Run(t, new(PluginTestSuite))
}

func (s *PluginTestSuite) TestABCIGRPCPlugin() {
s.T().Run("Should successfully load streaming", func(t *testing.T) {
abciListeners := s.loggerCtx.StreamingManager().Listeners
for _, abciListener := range abciListeners {
for i := range [50]int{} {

err := abciListener.ListenDeliverBlock(s.loggerCtx, s.deliverBlockrequest)
assert.NoError(t, err, "ListenEndBlock")

err = abciListener.ListenStateChanges(s.loggerCtx, s.changeSet)
assert.NoError(t, err, "ListenCommit")

s.updateHeight(int64(i + 1))
}
}
})
}

func (s *PluginTestSuite) updateHeight(n int64) {
s.loggerCtx = NewMockContext(n, s.loggerCtx.Logger(), s.loggerCtx.StreamingManager())
}

var (
_ context.Context = MockContext{}
_ Context = MockContext{}
)

type MockContext struct {
baseCtx context.Context
height int64
logger log.Logger
streamingManager Manager
}

func (m MockContext) BlockHeight() int64 { return m.height }
func (m MockContext) Logger() log.Logger { return m.logger }
func (m MockContext) StreamingManager() Manager { return m.streamingManager }

func NewMockContext(height int64, logger log.Logger, sm Manager) MockContext {
return MockContext{
baseCtx: context.Background(),
height: height,
logger: logger,
streamingManager: sm,
}
}

func (m MockContext) Deadline() (deadline time.Time, ok bool) {
return m.baseCtx.Deadline()
}

func (m MockContext) Done() <-chan struct{} {
return m.baseCtx.Done()
}

func (m MockContext) Err() error {
return m.baseCtx.Err()
}

func (m MockContext) Value(key any) any {
return m.baseCtx.Value(key)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve the error messages and assertions to provide more context in case of test failures. This will help in debugging and understanding the cause of the failure.

Example:

- assert.NoError(t, err, "ListenEndBlock")
+ assert.NoError(t, err, "failed to listen to end block: %v", err)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func TestPluginTestSuite(t *testing.T) {
suite.Run(t, new(PluginTestSuite))
}
func (s *PluginTestSuite) TestABCIGRPCPlugin() {
s.T().Run("Should successfully load streaming", func(t *testing.T) {
abciListeners := s.loggerCtx.StreamingManager().Listeners
for _, abciListener := range abciListeners {
for i := range [50]int{} {
err := abciListener.ListenDeliverBlock(s.loggerCtx, s.deliverBlockrequest)
assert.NoError(t, err, "ListenEndBlock")
err = abciListener.ListenStateChanges(s.loggerCtx, s.changeSet)
assert.NoError(t, err, "ListenCommit")
s.updateHeight(int64(i + 1))
}
}
})
}
func (s *PluginTestSuite) updateHeight(n int64) {
s.loggerCtx = NewMockContext(n, s.loggerCtx.Logger(), s.loggerCtx.StreamingManager())
}
var (
_ context.Context = MockContext{}
_ Context = MockContext{}
)
type MockContext struct {
baseCtx context.Context
height int64
logger log.Logger
streamingManager Manager
}
func (m MockContext) BlockHeight() int64 { return m.height }
func (m MockContext) Logger() log.Logger { return m.logger }
func (m MockContext) StreamingManager() Manager { return m.streamingManager }
func NewMockContext(height int64, logger log.Logger, sm Manager) MockContext {
return MockContext{
baseCtx: context.Background(),
height: height,
logger: logger,
streamingManager: sm,
}
}
func (m MockContext) Deadline() (deadline time.Time, ok bool) {
return m.baseCtx.Deadline()
}
func (m MockContext) Done() <-chan struct{} {
return m.baseCtx.Done()
}
func (m MockContext) Err() error {
return m.baseCtx.Err()
}
func (m MockContext) Value(key any) any {
return m.baseCtx.Value(key)
}
func TestPluginTestSuite(t *testing.T) {
suite.Run(t, new(PluginTestSuite))
}
func (s *PluginTestSuite) TestABCIGRPCPlugin() {
s.T().Run("Should successfully load streaming", func(t *testing.T) {
abciListeners := s.loggerCtx.StreamingManager().Listeners
for _, abciListener := range abciListeners {
for i := range [50]int{} {
err := abciListener.ListenDeliverBlock(s.loggerCtx, s.deliverBlockrequest)
assert.NoError(t, err, "failed to listen to end block: %v", err)
err = abciListener.ListenStateChanges(s.loggerCtx, s.changeSet)
assert.NoError(t, err, "failed to listen to state changes: %v", err)
s.updateHeight(int64(i + 1))
}
}
})
}
func (s *PluginTestSuite) updateHeight(n int64) {
s.loggerCtx = NewMockContext(n, s.loggerCtx.Logger(), s.loggerCtx.StreamingManager())
}
var (
_ context.Context = MockContext{}
_ Context = MockContext{}
)
type MockContext struct {
baseCtx context.Context
height int64
logger log.Logger
streamingManager Manager
}
func (m MockContext) BlockHeight() int64 { return m.height }
func (m MockContext) Logger() log.Logger { return m.logger }
func (m MockContext) StreamingManager() Manager { return m.streamingManager }
func NewMockContext(height int64, logger log.Logger, sm Manager) MockContext {
return MockContext{
baseCtx: context.Background(),
height: height,
logger: logger,
streamingManager: sm,
}
}
func (m MockContext) Deadline() (deadline time.Time, ok bool) {
return m.baseCtx.Deadline()
}
func (m MockContext) Done() <-chan struct{} {
return m.baseCtx.Done()
}
func (m MockContext) Err() error {
return m.baseCtx.Err()
}
func (m MockContext) Value(key any) any {
return m.baseCtx.Value(key)
}

Comment on lines +58 to +75
func getExtensionsNumbers(m proto.Message) []int32 {
gogoProtoExts := gogoproto.RegisteredExtensions(m)

out := make([]int32, 0, len(gogoProtoExts))
for id := range gogoProtoExts {
out = append(out, id)
}
if len(out) != 0 {
return out
}

protoExts := proto.RegisteredExtensions(m) //nolint:staticcheck // kept for backwards compatibility
out = make([]int32, 0, len(protoExts))
for id := range protoExts {
out = append(out, id)
}

return out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the potential non-determinism introduced by iterating over maps.

+ // Note: Iteration over maps may introduce non-determinism.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func getExtensionsNumbers(m proto.Message) []int32 {
gogoProtoExts := gogoproto.RegisteredExtensions(m)
out := make([]int32, 0, len(gogoProtoExts))
for id := range gogoProtoExts {
out = append(out, id)
}
if len(out) != 0 {
return out
}
protoExts := proto.RegisteredExtensions(m) //nolint:staticcheck // kept for backwards compatibility
out = make([]int32, 0, len(protoExts))
for id := range protoExts {
out = append(out, id)
}
return out
func getExtensionsNumbers(m proto.Message) []int32 {
// Note: Iteration over maps may introduce non-determinism.
gogoProtoExts := gogoproto.RegisteredExtensions(m)
out := make([]int32, 0, len(gogoProtoExts))
for id := range gogoProtoExts {
out = append(out, id)
}
if len(out) != 0 {
return out
}
protoExts := proto.RegisteredExtensions(m) //nolint:staticcheck // kept for backwards compatibility
out = make([]int32, 0, len(protoExts))
for id := range protoExts {
out = append(out, id)
}
return out

Comment on lines +24 to +97
type GRPCServer struct {
logger log.Logger

grpcSrv *grpc.Server
config *Config
}

type GRPCService interface {
// RegisterGRPCServer registers gRPC services directly with the gRPC server.
RegisterGRPCServer(gogogrpc.Server)
}

// New returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server.
func New(logger log.Logger, v *viper.Viper, interfaceRegistry appmanager.InterfaceRegistry, app GRPCService) (GRPCServer, error) {
cfg := DefaultConfig()
if v != nil {
if err := v.Sub(serverName).Unmarshal(&cfg); err != nil {
return GRPCServer{}, fmt.Errorf("failed to unmarshal config: %w", err)
}
}

grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(newProtoCodec(interfaceRegistry).GRPCCodec()),
grpc.MaxSendMsgSize(cfg.MaxSendMsgSize),
grpc.MaxRecvMsgSize(cfg.MaxRecvMsgSize),
)

app.RegisterGRPCServer(grpcSrv)

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

return GRPCServer{
grpcSrv: grpcSrv,
config: cfg,
logger: logger.With(log.ModuleKey, serverName),
}, nil
}

func (g GRPCServer) Name() string {
return serverName
}

func (g GRPCServer) Start(ctx context.Context) error {
listener, err := net.Listen("tcp", g.config.Address)
if err != nil {
return fmt.Errorf("failed to listen on address %s: %w", g.config.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
g.logger.Info("starting gRPC server...", "address", g.config.Address)
errCh <- g.grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
err = <-errCh
g.logger.Error("failed to start gRPC server", "err", err)
return err
}

func (g GRPCServer) Stop(ctx context.Context) error {
g.logger.Info("stopping gRPC server...", "address", g.config.Address)
g.grpcSrv.GracefulStop()

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more logging and error handling in the Start and Stop methods to provide better visibility into the server's lifecycle and potential issues.

Example:

if err := g.grpcSrv.Serve(listener); err != nil {
    g.logger.Error("failed to serve gRPC server", "err", err)
    errCh <- err
}

Comment on lines +107 to +187
type protoCodec struct {
interfaceRegistry appmanager.InterfaceRegistry
}

// newProtoCodec returns a reference to a new ProtoCodec
func newProtoCodec(interfaceRegistry appmanager.InterfaceRegistry) *protoCodec {
return &protoCodec{
interfaceRegistry: interfaceRegistry,
}
}

// Marshal implements BinaryMarshaler.Marshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.MarshalInterface
func (pc *protoCodec) Marshal(o gogoproto.Message) ([]byte, error) {
// Size() check can catch the typed nil value.
if o == nil || gogoproto.Size(o) == 0 {
// return empty bytes instead of nil, because nil has special meaning in places like store.Set
return []byte{}, nil
}

return gogoproto.Marshal(o)
}

// Unmarshal implements BinaryMarshaler.Unmarshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.UnmarshalInterface
func (pc *protoCodec) Unmarshal(bz []byte, ptr gogoproto.Message) error {
err := gogoproto.Unmarshal(bz, ptr)
if err != nil {
return err
}
// err = codectypes.UnpackInterfaces(ptr, pc.interfaceRegistry) // TODO: identify if needed for grpc
// if err != nil {
// return err
// }
return nil
}

func (pc *protoCodec) Name() string {
return "cosmos-sdk-grpc-codec"
}

// GRPCCodec returns the gRPC Codec for this specific ProtoCodec
func (pc *protoCodec) GRPCCodec() encoding.Codec {
return &grpcProtoCodec{cdc: pc}
}

// grpcProtoCodec is the implementation of the gRPC proto codec.
type grpcProtoCodec struct {
cdc appmanager.ProtoCodec
}

var errUnknownProtoType = errors.New("codec: unknown proto type") // sentinel error

func (g grpcProtoCodec) Marshal(v any) ([]byte, error) {
switch m := v.(type) {
case proto.Message:
protov2MarshalOpts := proto.MarshalOptions{Deterministic: true}
return protov2MarshalOpts.Marshal(m)
case gogoproto.Message:
return g.cdc.Marshal(m)
default:
return nil, fmt.Errorf("%w: cannot marshal type %T", errUnknownProtoType, v)
}
}

func (g grpcProtoCodec) Unmarshal(data []byte, v any) error {
switch m := v.(type) {
case proto.Message:
return proto.Unmarshal(data, m)
case gogoproto.Message:
return g.cdc.Unmarshal(data, m)
default:
return fmt.Errorf("%w: cannot unmarshal type %T", errUnknownProtoType, v)
}
}

func (g grpcProtoCodec) Name() string {
return "cosmos-sdk-grpc-codec"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more logging and error handling in the Marshal and Unmarshal methods to provide better visibility into the encoding and decoding process.

Example:

if err := gogoproto.Unmarshal(bz, ptr); err != nil {
    pc.logger.Error("failed to unmarshal protobuf message", "err", err)
    return err
}

Comment on lines +155 to +187
// grpcProtoCodec is the implementation of the gRPC proto codec.
type grpcProtoCodec struct {
cdc appmanager.ProtoCodec
}

var errUnknownProtoType = errors.New("codec: unknown proto type") // sentinel error

func (g grpcProtoCodec) Marshal(v any) ([]byte, error) {
switch m := v.(type) {
case proto.Message:
protov2MarshalOpts := proto.MarshalOptions{Deterministic: true}
return protov2MarshalOpts.Marshal(m)
case gogoproto.Message:
return g.cdc.Marshal(m)
default:
return nil, fmt.Errorf("%w: cannot marshal type %T", errUnknownProtoType, v)
}
}

func (g grpcProtoCodec) Unmarshal(data []byte, v any) error {
switch m := v.(type) {
case proto.Message:
return proto.Unmarshal(data, m)
case gogoproto.Message:
return g.cdc.Unmarshal(data, m)
default:
return fmt.Errorf("%w: cannot unmarshal type %T", errUnknownProtoType, v)
}
}

func (g grpcProtoCodec) Name() string {
return "cosmos-sdk-grpc-codec"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more logging and error handling in the Marshal and Unmarshal methods to provide better visibility into the encoding and decoding process.

Example:

if err := proto.Unmarshal(data, m); err != nil {
    g.logger.Error("failed to unmarshal gRPC message", "err", err)
    return err
}

Comment on lines +38 to +123
// DisplayableSink is an interface that defines a method for displaying metrics.
type DisplayableSink interface {
DisplayMetrics(resp http.ResponseWriter, req *http.Request) (any, error)
}

// Metrics defines a wrapper around application telemetry functionality. It allows
// metrics to be gathered at any point in time. When creating a Metrics object,
// internally, a global metrics is registered with a set of sinks as configured
// by the operator. In addition to the sinks, when a process gets a SIGUSR1, a
// dump of formatted recent metrics will be sent to STDERR.
type Metrics struct {
sink metrics.MetricSink
prometheusEnabled bool
}

// GatherResponse is the response type of registered metrics
type GatherResponse struct {
Metrics []byte
ContentType string
}

// New creates a new instance of Metrics
func New(cfg Config) (_ *Metrics, rerr error) {
if !cfg.Enabled {
return nil, nil
}

if numGlobalLabels := len(cfg.GlobalLabels); numGlobalLabels > 0 {
parsedGlobalLabels := make([]metrics.Label, numGlobalLabels)
for i, gl := range cfg.GlobalLabels {
parsedGlobalLabels[i] = NewLabel(gl[0], gl[1])
}
GlobalLabels = parsedGlobalLabels
}

metricsConf := metrics.DefaultConfig(cfg.ServiceName)
metricsConf.EnableHostname = cfg.EnableHostname
metricsConf.EnableHostnameLabel = cfg.EnableHostnameLabel

var (
sink metrics.MetricSink
err error
)
switch cfg.MetricsSink {
case MetricSinkStatsd:
sink, err = metrics.NewStatsdSink(cfg.StatsdAddr)
case MetricSinkDogsStatsd:
sink, err = datadog.NewDogStatsdSink(cfg.StatsdAddr, cfg.DatadogHostname)
default:
memSink := metrics.NewInmemSink(10*time.Second, time.Minute)
sink = memSink
inMemSig := metrics.DefaultInmemSignal(memSink)
defer func() {
if rerr != nil {
inMemSig.Stop()
}
}()
}

if err != nil {
return nil, err
}

m := &Metrics{sink: sink}
fanout := metrics.FanoutSink{sink}

if cfg.PrometheusRetentionTime > 0 {
m.prometheusEnabled = true
prometheusOpts := metricsprom.PrometheusOpts{
Expiration: time.Duration(cfg.PrometheusRetentionTime) * time.Second,
}

promSink, err := metricsprom.NewPrometheusSinkFrom(prometheusOpts)
if err != nil {
return nil, err
}

fanout = append(fanout, promSink)
}

if _, err := metrics.NewGlobal(metricsConf, fanout); err != nil {
return nil, err
}

return m, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more logging and error handling in the New and Gather methods to provide better visibility into the telemetry initialization and gathering process.

Example:

if err := metrics.NewGlobal(metricsConf, fanout); err != nil {
    m.logger.Error("failed to create global metrics", "err", err)
    return nil, err
}

Comment on lines +144 to +168
// gatherPrometheus collects Prometheus metrics and returns a GatherResponse.
// If Prometheus metrics are not enabled, it returns an error.
func (m *Metrics) gatherPrometheus() (GatherResponse, error) {
if !m.prometheusEnabled {
return GatherResponse{}, fmt.Errorf("prometheus metrics are not enabled")
}

metricsFamilies, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return GatherResponse{}, fmt.Errorf("failed to gather prometheus metrics: %w", err)
}

buf := &bytes.Buffer{}
defer buf.Reset()

e := expfmt.NewEncoder(buf, expfmt.NewFormat(expfmt.TypeTextPlain))

for _, mf := range metricsFamilies {
if err := e.Encode(mf); err != nil {
return GatherResponse{}, fmt.Errorf("failed to encode prometheus metrics: %w", err)
}
}

return GatherResponse{ContentType: ContentTypeText, Metrics: buf.Bytes()}, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more logging and error handling in the gatherPrometheus method to provide better visibility into the metrics gathering process.

Example:

if err := e.Encode(mf); err != nil {
    m.logger.Error("failed to encode prometheus metrics", "err", err)
    return GatherResponse{}, fmt.Errorf("failed to encode prometheus metrics: %w", err)
}

Comment on lines +170 to +188
// gatherGeneric collects generic metrics and returns a GatherResponse.
func (m *Metrics) gatherGeneric() (GatherResponse, error) {
gm, ok := m.sink.(DisplayableSink)
if !ok {
return GatherResponse{}, fmt.Errorf("non in-memory metrics sink does not support generic format")
}

summary, err := gm.DisplayMetrics(nil, nil)
if err != nil {
return GatherResponse{}, fmt.Errorf("failed to gather in-memory metrics: %w", err)
}

content, err := json.Marshal(summary)
if err != nil {
return GatherResponse{}, fmt.Errorf("failed to encode in-memory metrics: %w", err)
}

return GatherResponse{ContentType: "application/json", Metrics: content}, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding more logging and error handling in the gatherGeneric method to provide better visibility into the metrics gathering process.

Example:

if err := json.Marshal(summary); err != nil {
    m.logger.Error("failed to encode in-memory metrics", "err", err)
    return GatherResponse{}, fmt.Errorf("failed to encode in-memory metrics: %w", err)
}

Copy link
Member

@julienrbrt julienrbrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm, we should add a CI job for this (for when we add tests)

func DefaultConfig() *Config {
return &Config{
Enable: true,
// DefaultGRPCAddress defines the default address to bind the gRPC server to.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those comments don't really make sense anymore

Copy link
Member

@kocubinski kocubinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, since it's a pretty mechanical movement of server/v1 code -> v2 for streaming.

On the server/v2 go.mod, this is already in progress testing on the simapp/v2 branch, so I'm confident with it as is for now as it's in active development.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Details

Configuration used: .coderabbit.yml
Review profile: CHILL

Commits Files that changed from the base of the PR and between ed638cd and da38aff.
Files selected for processing (1)
  • simapp/gomod2nix.toml (2 hunks)
Files skipped from review as they are similar to previous changes (1)
  • simapp/gomod2nix.toml

@kocubinski kocubinski mentioned this pull request May 16, 2024
12 tasks
@tac0turtle tac0turtle added this pull request to the merge queue May 17, 2024
Merged via the queue into main with commit d7cc6de May 17, 2024
69 checks passed
@tac0turtle tac0turtle deleted the marko/upstream_more_v2 branch May 17, 2024 11:02
alpe added a commit that referenced this pull request May 23, 2024
* main: (95 commits)
  fix(x/accounts): check for overflows in multisig weights and votes (#20384)
  docs(x/account/auth): Improve error handling and comments in fee.go (#20426)
  docs: fix some markdown syntax (#20432)
  revert: bank change module to account change (#20427)
  fix: nil pointer panic when store don't exists in historical version (#20425)
  fix(store/v2): Remove should not error on miss (#20423)
  chore: upstream more changes from v2 (#20387)
  docs(x/auth/ante): fixed typo  in TxWithTimeoutHeight interface name (#20418)
  fix: avoid default sendenabled for module accounts (#20419)
  docs(x/auth): fixed typo in command example for multisign transaction (#20417)
  build(deps): Bump bufbuild/buf-setup-action from 1.31.0 to 1.32.0 (#20413)
  build(deps): Bump github.com/hashicorp/go-plugin from 1.6.0 to 1.6.1 in /store (#20414)
  feat(x/accounts): Add schema caching feature and corresponding test case (#20055)
  refactor(runtime/v2): remove dependency on sdk (#20389)
  refactor!: turn MsgsV2 into ReflectMessages to make it less confusing (#19839)
  docs: Enhanced the ParsePagination method documentation (#20385)
  refactor(runtime,core): split router service (#20401)
  chore: fix spelling errors (#20400)
  docs: Documented error handling in OfferSnapshot method (#20380)
  build(deps): Bump google.golang.org/grpc from 1.63.2 to 1.64.0 (#20390)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants