Skip to content

Commit

Permalink
feat(shrex/eds): Client/Server (#1431)
Browse files Browse the repository at this point in the history
Co-authored-by: Viacheslav <viacheslavgonkivskyi@gmail.com>

Closes #1416
Closes #1417
Closes #1418
  • Loading branch information
distractedm1nd authored Dec 21, 2022
1 parent ff95cf2 commit 2a02f2f
Show file tree
Hide file tree
Showing 7 changed files with 1,047 additions and 0 deletions.
20 changes: 20 additions & 0 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/host"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
)

func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option {
Expand Down Expand Up @@ -48,6 +51,23 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return fx.Module(
"share",
baseComponents,
fx.Provide(fx.Annotate(
func(host host.Host, store *eds.Store, network modp2p.Network) (*shrexeds.Server, error) {
return shrexeds.NewServer(host, store, shrexeds.WithProtocolSuffix(string(network)))
},
fx.OnStart(func(ctx context.Context, server *shrexeds.Server) error {
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexeds.Server) error {
return server.Stop(ctx)
}),
)),
// Bridge Nodes need a client as well, for requests over FullAvailability
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) {
return shrexeds.NewClient(host, shrexeds.WithProtocolSuffix(string(network)))
},
),
fx.Provide(fx.Annotate(
func(path node.StorePath, ds datastore.Batching) (*eds.Store, error) {
return eds.NewStore(string(path), ds)
Expand Down
147 changes: 147 additions & 0 deletions share/p2p/shrexeds/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package shrexeds

import (
"context"
"errors"
"fmt"
"net"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
p2p_pb "github.com/celestiaorg/celestia-node/share/p2p/shrexeds/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"
"github.com/celestiaorg/rsmt2d"
)

var errNoMorePeers = errors.New("all peers returned invalid responses")

// Client is responsible for requesting EDSs for blocksync over the ShrEx/EDS protocol.
type Client struct {
protocolID protocol.ID
host host.Host
}

// NewClient creates a new ShrEx/EDS client.
func NewClient(host host.Host, opts ...Option) (*Client, error) {
params := DefaultParameters()
for _, opt := range opts {
opt(params)
}

if err := params.Validate(); err != nil {
return nil, fmt.Errorf("shrex-eds: client creation failed: %w", err)
}

return &Client{
host: host,
protocolID: protocolID(params.protocolSuffix),
}, nil
}

// RequestEDS requests the full ODS from one of the given peers and returns the EDS.
//
// The peers are requested in a round-robin manner with retries until one of them gives a valid
// response, blocking until the context is canceled or a valid response is given.
func (c *Client) RequestEDS(
ctx context.Context,
dataHash share.DataHash,
peers peer.IDSlice,
) (*rsmt2d.ExtendedDataSquare, error) {
req := &p2p_pb.EDSRequest{Hash: dataHash}

// requests are retried for every peer until a valid response is received
excludedPeers := make(map[peer.ID]struct{})
for {
// if no peers are left, return
if len(peers) == len(excludedPeers) {
return nil, errNoMorePeers
}

for _, to := range peers {
// skip over excluded peers
if _, ok := excludedPeers[to]; ok {
continue
}
eds, err := c.doRequest(ctx, req, to)
if eds != nil {
return eds, err
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return nil, ctx.Err()
}
// some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not
// unwrap to a ctx err
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
return nil, context.DeadlineExceeded
}
if err != nil {
// peer has misbehaved, exclude them from round-robin
excludedPeers[to] = struct{}{}
log.Errorw("client: eds request to peer failed", "peer", to, "hash", dataHash.String())
}

// no eds was found, continue
}
}
}

func (c *Client) doRequest(
ctx context.Context,
req *p2p_pb.EDSRequest,
to peer.ID,
) (*rsmt2d.ExtendedDataSquare, error) {
dataHash := share.DataHash(req.Hash)
log.Debugf("client: requesting eds %s from peer %s", dataHash.String(), to)
stream, err := c.host.NewStream(ctx, to, c.protocolID)
if err != nil {
return nil, fmt.Errorf("failed to open stream: %w", err)
}
if dl, ok := ctx.Deadline(); ok {
if err = stream.SetDeadline(dl); err != nil {
log.Debugw("error setting deadline: %s", err)
}
}

// request ODS
_, err = serde.Write(stream, req)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("failed to write request to stream: %w", err)
}
err = stream.CloseWrite()
if err != nil {
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("failed to close write on stream: %w", err)
}

// read and parse status from peer
resp := new(p2p_pb.EDSResponse)
_, err = serde.Read(stream, resp)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("failed to read status from stream: %w", err)
}

switch resp.Status {
case p2p_pb.Status_OK:
// use header and ODS bytes to construct EDS and verify it against dataHash
eds, err := eds.ReadEDS(ctx, stream, dataHash)
if err != nil {
return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err)
}
return eds, nil
case p2p_pb.Status_NOT_FOUND, p2p_pb.Status_REFUSED:
log.Debugf("client: peer %s couldn't serve eds %s with status %s", to.String(), dataHash.String(), resp.GetStatus())
// no eds was returned, but the request was valid and should be retried
return nil, nil
case p2p_pb.Status_INVALID:
fallthrough
default:
return nil, fmt.Errorf("request status %s returned for root %s", resp.GetStatus(), dataHash.String())
}
}
115 changes: 115 additions & 0 deletions share/p2p/shrexeds/exchange_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package shrexeds

import (
"context"
"testing"
"time"

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
libhost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
)

func TestExchange_RequestEDS(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store, client, server := makeExchange(t)

err := store.Start(ctx)
require.NoError(t, err)

err = server.Start(ctx)
require.NoError(t, err)

// Testcase: EDS is immediately available
t.Run("EDS_Available", func(t *testing.T) {
eds := share.RandEDS(t, 4)
dah := da.NewDataAvailabilityHeader(eds)
err = store.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

requestedEDS, err := client.RequestEDS(ctx, dah.Hash(), []peer.ID{server.host.ID()})
assert.NoError(t, err)
assert.Equal(t, eds.Flattened(), requestedEDS.Flattened())

})

// Testcase: EDS is unavailable initially, but is found after multiple requests
t.Run("EDS_AvailableAfterDelay", func(t *testing.T) {
storageDelay := time.Second
eds := share.RandEDS(t, 4)
dah := da.NewDataAvailabilityHeader(eds)
go func() {
time.Sleep(storageDelay)
err = store.Put(ctx, dah.Hash(), eds)
// require.NoError(t, err)
}()

now := time.Now()
requestedEDS, err := client.RequestEDS(ctx, dah.Hash(), []peer.ID{server.host.ID()})
finished := time.Now()

assert.Greater(t, finished.Sub(now), storageDelay)
assert.NoError(t, err)
assert.Equal(t, eds.Flattened(), requestedEDS.Flattened())
})

// Testcase: Invalid request excludes peer from round-robin, stopping request
t.Run("EDS_InvalidRequest", func(t *testing.T) {
dataHash := []byte("invalid")
requestedEDS, err := client.RequestEDS(ctx, dataHash, []peer.ID{server.host.ID()})
assert.ErrorIs(t, err, errNoMorePeers)
assert.Nil(t, requestedEDS)
})

// Testcase: Valid request, which server cannot serve, waits forever
t.Run("EDS_ValidTimeout", func(t *testing.T) {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)
eds := share.RandEDS(t, 4)
dah := da.NewDataAvailabilityHeader(eds)
requestedEDS, err := client.RequestEDS(timeoutCtx, dah.Hash(), []peer.ID{server.host.ID()})
assert.ErrorIs(t, err, timeoutCtx.Err())
assert.Nil(t, requestedEDS)
})
}

func newStore(t *testing.T) *eds.Store {
t.Helper()

tmpDir := t.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
store, err := eds.NewStore(tmpDir, ds)
require.NoError(t, err)
return store
}

func createMocknet(t *testing.T, amount int) []libhost.Host {
t.Helper()

net, err := mocknet.FullMeshConnected(amount)
require.NoError(t, err)
// get host and peer
return net.Hosts()
}

func makeExchange(t *testing.T) (*eds.Store, *Client, *Server) {
t.Helper()
store := newStore(t)
hosts := createMocknet(t, 2)

client, err := NewClient(hosts[0])
require.NoError(t, err)
server, err := NewServer(hosts[1], store)
require.NoError(t, err)

return store, client, server
}
74 changes: 74 additions & 0 deletions share/p2p/shrexeds/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package shrexeds

import (
"fmt"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/protocol"
)

const protocolPrefix = "/shrex/eds/v0.0.1/"

var log = logging.Logger("shrex-eds")

// Option is the functional option that is applied to the shrex/eds protocol to configure its
// parameters.
type Option func(*Parameters)

// Parameters is the set of parameters that must be configured for the shrex/eds protocol.
type Parameters struct {
// ReadDeadline sets the timeout for reading messages from the stream.
ReadDeadline time.Duration

// WriteDeadline sets the timeout for writing messages to the stream.
WriteDeadline time.Duration

// ReadCARDeadline defines the deadline for reading a CAR from disk.
ReadCARDeadline time.Duration

// BufferSize defines the size of the buffer used for writing an ODS over the stream.
BufferSize uint64

// protocolSuffix is appended to the protocolID and represents the network the protocol is
// running on.
protocolSuffix string
}

func DefaultParameters() *Parameters {
return &Parameters{
ReadDeadline: time.Minute,
WriteDeadline: time.Second * 5,
ReadCARDeadline: time.Minute,
BufferSize: 32 * 1024,
}
}

const errSuffix = "value should be positive and non-zero"

func (p *Parameters) Validate() error {
if p.ReadDeadline <= 0 {
return fmt.Errorf("invalid stream read deadline: %s", errSuffix)
}
if p.WriteDeadline <= 0 {
return fmt.Errorf("invalid write deadline: %s", errSuffix)
}
if p.ReadCARDeadline <= 0 {
return fmt.Errorf("invalid read CAR deadline: %s", errSuffix)
}
if p.BufferSize <= 0 {
return fmt.Errorf("invalid buffer size: %s", errSuffix)
}
return nil
}

// WithProtocolSuffix is a functional option that configures the `protocolSuffix` parameter
func WithProtocolSuffix(protocolSuffix string) Option {
return func(parameters *Parameters) {
parameters.protocolSuffix = protocolSuffix
}
}

func protocolID(protocolSuffix string) protocol.ID {
return protocol.ID(fmt.Sprintf("%s%s", protocolPrefix, protocolSuffix))
}
Loading

0 comments on commit 2a02f2f

Please sign in to comment.