Skip to content

Commit

Permalink
Allow relay protos to be larger. (#1174)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@eigenlabs.org>
  • Loading branch information
cody-littley authored Jan 28, 2025
1 parent fd4444f commit 91f7e6e
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 37 deletions.
3 changes: 2 additions & 1 deletion api/clients/v2/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clients
import (
"context"
"fmt"
"github.com/docker/go-units"
"sync"

"github.com/Layr-Labs/eigenda/api"
Expand Down Expand Up @@ -314,7 +315,7 @@ func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnceGrpc.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, 4*units.MiB)
conn, err := grpc.NewClient(addr, dialOptions...)
if err != nil {
initErr = err
Expand Down
3 changes: 2 additions & 1 deletion api/clients/v2/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/api"
"github.com/docker/go-units"
"sync"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (c *nodeClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, 4*units.MiB)
conn, err := grpc.NewClient(addr, dialOptions...)
if err != nil {
initErr = err
Expand Down
19 changes: 12 additions & 7 deletions api/clients/v2/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
type MessageSigner func(ctx context.Context, data [32]byte) (*core.Signature, error)

type RelayClientConfig struct {
Sockets map[corev2.RelayKey]string
UseSecureGrpcFlag bool
OperatorID *core.OperatorID
MessageSigner MessageSigner
Sockets map[corev2.RelayKey]string
UseSecureGrpcFlag bool
MaxGRPCMessageSize uint
OperatorID *core.OperatorID
MessageSigner MessageSigner
}

type ChunkRequestByRange struct {
Expand Down Expand Up @@ -71,8 +72,12 @@ var _ RelayClient = (*relayClient)(nil)
// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (RelayClient, error) {
if config == nil || len(config.Sockets) <= 0 {
return nil, fmt.Errorf("invalid config: %v", config)
if config == nil {
return nil, errors.New("nil config")
} else if len(config.Sockets) <= 0 {
return nil, errors.New("no relay sockets provided")
} else if config.MaxGRPCMessageSize == 0 {
return nil, errors.New("max gRPC message size must be greater than 0")
}

logger.Info("creating relay client", "urls", config.Sockets)
Expand Down Expand Up @@ -241,7 +246,7 @@ func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
initErr = fmt.Errorf("unknown relay key: %v", key)
return
}
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag, c.config.MaxGRPCMessageSize)
conn, err := grpc.NewClient(socket, dialOptions...)
if err != nil {
initErr = err
Expand Down
6 changes: 5 additions & 1 deletion api/clients/v2/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption {
// getGrpcDialOptions builds the gRPC dial options based on the useSecureGrpcFlag and maxMessageSize.
func getGrpcDialOptions(useSecureGrpcFlag bool, maxMessageSize uint) []grpc.DialOption {
options := []grpc.DialOption{}
if useSecureGrpcFlag {
options = append(options, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
} else {
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

options = append(options, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(maxMessageSize))))

return options
}
4 changes: 3 additions & 1 deletion inabox/tests/integration_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/docker/go-units"
"math/big"
"time"

Expand Down Expand Up @@ -207,7 +208,8 @@ var _ = Describe("Inabox v2 Integration", func() {

// Test retrieval from relay
relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relays,
Sockets: relays,
MaxGRPCMessageSize: units.GiB,
}, logger)
Expect(err).To(BeNil())

Expand Down
2 changes: 2 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Config struct {
EnableGnarkBundleEncoding bool
ClientIPHeader string
UseSecureGrpc bool
RelayMaxMessageSize uint
ReachabilityPollIntervalSec uint64
DisableNodeInfoResources bool

Expand Down Expand Up @@ -292,6 +293,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name),
ClientIPHeader: ctx.GlobalString(flags.ClientIPHeaderFlag.Name),
UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name),
RelayMaxMessageSize: uint(ctx.GlobalInt(flags.RelayMaxGRPCMessageSizeFlag.Name)),
DisableNodeInfoResources: ctx.GlobalBool(flags.DisableNodeInfoResourcesFlag.Name),
BlsSignerConfig: blsSignerConfig,
EnableV2: v2Enabled,
Expand Down
13 changes: 11 additions & 2 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flags

import (
"github.com/docker/go-units"
"time"

"github.com/Layr-Labs/eigenda/common"
Expand Down Expand Up @@ -249,7 +250,7 @@ var (
Usage: "The maximum message size in bytes the V2 dispersal endpoint can receive from the client. This flag is only relevant in v2 (default: 1MB)",
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "GRPC_MSG_SIZE_LIMIT_V2"),
Value: 1024 * 1024,
Value: units.MiB,
}
DisableDispersalAuthenticationFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "disable-dispersal-authentication"),
Expand All @@ -262,7 +263,7 @@ var (
Usage: "The size of the dispersal authentication key cache",
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DISPERSAL_AUTHENTICATION_KEY_CACHE_SIZE"),
Value: 1024,
Value: units.KiB,
}
DisperserKeyTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "disperser-key-timeout"),
Expand All @@ -278,6 +279,13 @@ var (
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DISPERSAL_AUTHENTICATION_TIMEOUT"),
Value: time.Minute,
}
RelayMaxGRPCMessageSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "relay-max-grpc-message-size"),
Usage: "The maximum message size in bytes for messages received from the relay",
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "RELAY_MAX_GRPC_MESSAGE_SIZE"),
Value: units.GiB, // intentionally large for the time being
}

// Test only, DO NOT USE the following flags in production

Expand Down Expand Up @@ -439,6 +447,7 @@ var optionalFlags = []cli.Flag{
DispersalAuthenticationKeyCacheSizeFlag,
DisperserKeyTimeoutFlag,
DispersalAuthenticationTimeoutFlag,
RelayMaxGRPCMessageSizeFlag,
}

func init() {
Expand Down
2 changes: 2 additions & 0 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc_test
import (
"context"
"fmt"
"github.com/docker/go-units"
"net"
"os"
"runtime"
Expand Down Expand Up @@ -88,6 +89,7 @@ func makeConfig(t *testing.T) *node.Config {
NumBatchValidators: runtime.GOMAXPROCS(0),
EnableV2: false,
DisableDispersalAuthentication: true,
RelayMaxMessageSize: units.GiB,
}
}

Expand Down
18 changes: 10 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,11 @@ func NewNode(

logger.Info("Creating relay client", "relayURLs", relayURLs)
relayClient, err = clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
UseSecureGrpcFlag: config.UseSecureGrpc,
OperatorID: &config.ID,
MessageSigner: n.SignMessage,
Sockets: relayURLs,
UseSecureGrpcFlag: config.UseSecureGrpc,
OperatorID: &config.ID,
MessageSigner: n.SignMessage,
MaxGRPCMessageSize: n.Config.RelayMaxMessageSize,
}, logger)

if err != nil {
Expand Down Expand Up @@ -422,10 +423,11 @@ func (n *Node) RefreshOnchainState(ctx context.Context) error {
}

relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
UseSecureGrpcFlag: n.Config.UseSecureGrpc,
OperatorID: &n.Config.ID,
MessageSigner: n.SignMessage,
Sockets: relayURLs,
UseSecureGrpcFlag: n.Config.UseSecureGrpc,
OperatorID: &n.Config.ID,
MessageSigner: n.SignMessage,
MaxGRPCMessageSize: n.Config.RelayMaxMessageSize,
}, n.Logger)
if err != nil {
n.Logger.Error("error creating relay client", "err", err)
Expand Down
2 changes: 2 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node_test
import (
"context"
"errors"
"github.com/docker/go-units"
"os"
"runtime"
"testing"
Expand Down Expand Up @@ -56,6 +57,7 @@ func newComponents(t *testing.T, operatorID [32]byte) *components {
EnableNodeApi: false,
EnableMetrics: false,
RegisterNodeAtStart: false,
RelayMaxMessageSize: units.GiB,
}
loggerConfig := common.DefaultLoggerConfig()
logger, err := common.NewLogger(loggerConfig)
Expand Down
8 changes: 5 additions & 3 deletions node/node_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node_test
import (
"context"
"fmt"
"github.com/docker/go-units"
"testing"
"time"

Expand Down Expand Up @@ -257,9 +258,10 @@ func TestRefreshOnchainStateSuccess(t *testing.T) {
}

relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
OperatorID: &c.node.Config.ID,
MessageSigner: messageSigner,
Sockets: relayURLs,
OperatorID: &c.node.Config.ID,
MessageSigner: messageSigner,
MaxGRPCMessageSize: units.GiB,
}, c.node.Logger)
require.NoError(t, err)
// set up non-mock client
Expand Down
21 changes: 11 additions & 10 deletions relay/cmd/flags/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flags

import (
"github.com/docker/go-units"
"time"

"github.com/Layr-Labs/eigenda/common"
Expand Down Expand Up @@ -45,14 +46,14 @@ var (
Usage: "Max size of a gRPC message in bytes",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GRPC_MESSAGE_SIZE"),
Value: 1024 * 1024 * 300,
Value: 4 * units.MiB,
}
MetadataCacheSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metadata-cache-size"),
Usage: "Max number of items in the metadata cache",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "METADATA_CACHE_SIZE"),
Value: 1024 * 1024,
Value: units.MiB,
}
MetadataMaxConcurrencyFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metadata-max-concurrency"),
Expand All @@ -66,7 +67,7 @@ var (
Usage: "The size of the blob cache, in bytes.",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "BLOB_CACHE_SIZE"),
Value: 1024 * 1024 * 1024,
Value: 8 * units.GiB,
}
BlobMaxConcurrencyFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "blob-max-concurrency"),
Expand All @@ -80,7 +81,7 @@ var (
Usage: "Size of the chunk cache, in bytes.",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "CHUNK_CACHE_BYTES"),
Value: 1024 * 1024 * 1024,
Value: units.GiB,
}
ChunkMaxConcurrencyFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "chunk-max-concurrency"),
Expand Down Expand Up @@ -115,14 +116,14 @@ var (
Usage: "Max bandwidth for GetBlob operations in bytes per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_BLOB_BYTES_PER_SECOND"),
Value: 20 * 1024 * 1024,
Value: 20 * units.MiB,
}
GetBlobBytesBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-blob-bytes-burstiness"),
Usage: "Burstiness of the GetBlob bandwidth rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_BLOB_BYTES_BURSTINESS"),
Value: 20 * 1024 * 1024,
Value: 20 * units.MiB,
}
MaxConcurrentGetBlobOpsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-blob-ops"),
Expand Down Expand Up @@ -150,14 +151,14 @@ var (
Usage: "Max bandwidth for GetChunk operations in bytes per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND"),
Value: 80 * 1024 * 1024,
Value: 80 * units.MiB,
}
GetChunkBytesBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS"),
Value: 800 * 1024 * 1024,
Value: 800 * units.MiB,
}
MaxConcurrentGetChunkOpsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops"),
Expand Down Expand Up @@ -185,14 +186,14 @@ var (
Usage: "Max bandwidth for GetChunk operations in bytes per second per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND_CLIENT"),
Value: 40 * 1024 * 1024,
Value: 40 * units.MiB,
}
GetChunkBytesBurstinessClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness-client"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS_CLIENT"),
Value: 400 * 1024 * 1024,
Value: 400 * units.MiB,
}
MaxConcurrentGetChunkOpsClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops-client"),
Expand Down
3 changes: 2 additions & 1 deletion relay/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package relay
import (
"context"
"encoding/binary"
"github.com/docker/go-units"
"testing"
"time"

Expand All @@ -26,7 +27,7 @@ import (
func defaultConfig() *Config {
return &Config{
GRPCPort: 50051,
MaxGRPCMessageSize: 1024 * 1024 * 300,
MaxGRPCMessageSize: units.MB,
MetadataCacheSize: 1024 * 1024,
MetadataMaxConcurrency: 32,
BlobCacheBytes: 1024 * 1024,
Expand Down
2 changes: 2 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/docker/go-units"
"log"
"math"
"math/big"
Expand Down Expand Up @@ -367,6 +368,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging
QuorumIDList: registeredQuorums,
DispersalAuthenticationKeyCacheSize: 1024,
DisableDispersalAuthentication: false,
RelayMaxMessageSize: units.GiB,
}

// creating a new instance of encoder instead of sharing enc because enc is not thread safe
Expand Down
Loading

0 comments on commit 91f7e6e

Please sign in to comment.