Skip to content

Commit

Permalink
relay client
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 8, 2024
1 parent d1603f2 commit c6da8ce
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 300 deletions.
40 changes: 40 additions & 0 deletions api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

type MockRelayClient struct {
mock.Mock
}

var _ clients.RelayClient = (*MockRelayClient)(nil)

func NewRelayClient() *MockRelayClient {
return &MockRelayClient{}
}

func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
args := c.Called(blobKey)
return args.Get(0).([]byte), args.Error(1)
}

func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([]core.Bundle, error) {
args := c.Called()
return args.Get(0).([]core.Bundle), args.Error(1)
}

func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByIndex) ([]core.Bundle, error) {
args := c.Called()
return args.Get(0).([]core.Bundle), args.Error(1)
}

func (c *MockRelayClient) Close() error {
args := c.Called()
return args.Error(0)
}
3 changes: 1 addition & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node"
"github.com/wealdtech/go-merkletree/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (c client) GetBlobHeader(
return nil, nil, err
}

blobHeader, err := node.GetBlobHeaderFromProto(reply.GetBlobHeader())
blobHeader, err := core.BlobHeaderFromProto(reply.GetBlobHeader())
if err != nil {
return nil, nil, err
}
Expand Down
214 changes: 214 additions & 0 deletions api/clients/relay_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package clients

import (
"context"
"fmt"
"sync"

relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
)

type RelayClientConfig struct {
Sockets map[corev2.RelayKey]string
UseSecureGrpcFlag bool
}

type ChunkRequestByRange struct {
BlobKey corev2.BlobKey
Start uint32
End uint32
}

type ChunkRequestByIndex struct {
BlobKey corev2.BlobKey
Indexes []uint32
}

type RelayClient interface {
// GetBlob retrieves a blob from a relay
GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error)
// GetChunksByRange retrieves blob chunks from a relay by chunk index range
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([]core.Bundle, error)
// GetChunksByIndex retrieves blob chunks from a relay by index
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([]core.Bundle, error)
Close() error
}

type relayClient struct {
config *RelayClientConfig

initOnce map[corev2.RelayKey]*sync.Once
conns map[corev2.RelayKey]*grpc.ClientConn
logger logging.Logger

grpcClients map[corev2.RelayKey]relaygrpc.RelayClient
}

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayClient, error) {
if config == nil || len(config.Sockets) > 0 {
return nil, fmt.Errorf("invalid config: %v", config)
}

initOnce := make(map[corev2.RelayKey]*sync.Once)
conns := make(map[corev2.RelayKey]*grpc.ClientConn)
grpcClients := make(map[corev2.RelayKey]relaygrpc.RelayClient)
for key := range config.Sockets {
initOnce[key] = &sync.Once{}
}
return &relayClient{
config: config,

initOnce: initOnce,
conns: conns,
logger: logger,

grpcClients: grpcClients,
}, nil
}

func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{
BlobKey: blobKey[:],
})
if err != nil {
return nil, err
}

return res.GetBlob(), nil
}

func (c *relayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([]core.Bundle, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Request: &relaygrpc.ChunkRequest_ByRange{
ByRange: &relaygrpc.ChunkRequestByRange{
BlobKey: req.BlobKey[:],
StartIndex: req.Start,
EndIndex: req.End,
},
},
}
}
res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{
ChunkRequests: grpcRequests,
})

if err != nil {
return nil, err
}

bundles := make([]core.Bundle, len(res.GetData()))
for i, chunks := range res.GetData() {
bundles[i] = encoding.FramesFromProtobuf(chunks.GetData())
}
return bundles, nil
}

func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([]core.Bundle, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Request: &relaygrpc.ChunkRequest_ByIndex{
ByIndex: &relaygrpc.ChunkRequestByIndex{
BlobKey: req.BlobKey[:],
ChunkIndices: req.Indexes,
},
},
}
}
res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{
ChunkRequests: grpcRequests,
})

if err != nil {
return nil, err
}

bundles := make([]core.Bundle, len(res.GetData()))
for i, chunks := range res.GetData() {
bundles[i] = encoding.FramesFromProtobuf(chunks.GetData())
}
return bundles, nil
}

func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
var initErr error
c.initOnce[key].Do(func() {
socket, ok := c.config.Sockets[key]
if !ok {
initErr = fmt.Errorf("unknown relay key: %v", key)
return
}
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
conn, err := grpc.Dial(socket, dialOptions...)
if err != nil {
initErr = err
return
}
c.conns[key] = conn
c.grpcClients[key] = relaygrpc.NewRelayClient(conn)
})
return initErr
}

func (c *relayClient) Close() error {
var errList *multierror.Error
for k, conn := range c.conns {
if conn != nil {
err := conn.Close()
conn = nil
c.grpcClients[k] = nil
if err != nil {
c.logger.Error("failed to close connection", "err", err)
errList = multierror.Append(errList, err)
}
}
}
return errList.ErrorOrNil()
}
Loading

0 comments on commit c6da8ce

Please sign in to comment.