Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

!feat(share/p2p/shrex-nd): rework shrex-nd to serve data in multiple messages #2

Draft
wants to merge 5 commits into
base: v0.11.0-rc9-fork
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions share/eds/edstest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/rsmt2d"

Expand All @@ -29,3 +30,16 @@ func RandEDS(t require.TestingT, size int) *rsmt2d.ExtendedDataSquare {
require.NoError(t, err, "failure to recompute the extended data square")
return eds
}

func RandEDSWithNamespace(
t require.TestingT,
namespace share.Namespace,
size int,
) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader) {
shares := sharetest.RandSharesWithNamespace(t, namespace, size*size)
eds, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), wrapper.NewConstructor(uint64(size)))
require.NoError(t, err, "failure to recompute the extended data square")
dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(t, err)
return eds, dah
}
34 changes: 31 additions & 3 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package getters

import (
"bytes"
"context"
"encoding/binary"
"errors"
"math/rand"
"sort"
"testing"
"time"

Expand All @@ -17,6 +20,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"
Expand All @@ -31,6 +35,7 @@ import (
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestShrexGetter(t *testing.T) {
Expand Down Expand Up @@ -58,12 +63,13 @@ func TestShrexGetter(t *testing.T) {
getter := NewShrexGetter(edsClient, ndClient, peerManager)
require.NoError(t, getter.Start(ctx))

t.Run("ND_Available", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Run("ND_Available, total data size > 1mb", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
t.Cleanup(cancel)

// generate test data
randEDS, dah, namespace := generateTestEDS(t)
namespace := sharetest.RandV0Namespace()
randEDS, dah := singleNamespaceEds(t, namespace, 64)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), randEDS))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Expand Down Expand Up @@ -368,3 +374,25 @@ func TestAddToNamespace(t *testing.T) {
})
}
}

func singleNamespaceEds(
t require.TestingT,
namespace share.Namespace,
size int,
) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader) {
shares := make([]share.Share, size*size)
rnd := rand.New(rand.NewSource(time.Now().Unix()))
for i := range shares {
shr := make([]byte, share.Size)
copy(share.GetNamespace(shr), namespace)
_, err := rnd.Read(share.GetData(shr))
require.NoError(t, err)
shares[i] = shr
}
sort.Slice(shares, func(i, j int) bool { return bytes.Compare(shares[i], shares[j]) < 0 })
eds, err := rsmt2d.ComputeExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), wrapper.NewConstructor(uint64(size)))
require.NoError(t, err, "failure to recompute the extended data square")
dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(t, err)
return eds, dah
}
2 changes: 2 additions & 0 deletions share/p2p/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
// available at the moment. The request may be retried later, but it's unlikely to succeed.
var ErrNotFound = errors.New("the requested data or resource could not be found")

var ErrRateLimited = errors.New("server is overloaded and rate limited the request")

// ErrInvalidResponse is returned when a peer returns an invalid response or caused an internal
// error. It is used to signal that the peer couldn't serve the data successfully, and should not be
// retried.
Expand Down
4 changes: 4 additions & 0 deletions share/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ var meter = otel.Meter("shrex/eds")
type status string

const (
StatusBadRequest status = "bad_request"
StatusSendRespErr status = "send_resp_err"
StatusSendReqErr status = "send_req_err"
StatusReadRespErr status = "read_resp_err"
StatusInternalErr status = "internal_err"
StatusNotFound status = "not_found"
StatusTimeout status = "timeout"
Expand Down
105 changes: 57 additions & 48 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewClient(params *Parameters, host host.Host) (*Client, error) {
}

// RequestND requests namespaced data from the given peer.
// Returns shares with unverified inclusion proofs against the share.Root.
// Returns NamespacedShares with unverified inclusion proofs against the share.Root.
func (c *Client) RequestND(
ctx context.Context,
root *share.Root,
Expand Down Expand Up @@ -73,7 +73,7 @@ func (c *Client) RequestND(
return nil, context.DeadlineExceeded
}
}
if err != p2p.ErrNotFound {
if err != p2p.ErrNotFound && err != p2p.ErrRateLimited {
log.Warnw("client-nd: peer returned err", "err", err)
}
return nil, err
Expand All @@ -100,6 +100,7 @@ func (c *Client) doRequest(

_, err = serde.Write(stream, req)
if err != nil {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSendReqErr)
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("client-nd: writing request: %w", err)
}
Expand All @@ -109,59 +110,70 @@ func (c *Client) doRequest(
log.Debugw("client-nd: closing write side of the stream", "err", err)
}

var resp pb.GetSharesByNamespaceResponse
_, err = serde.Read(stream, &resp)
if err := c.readStatus(ctx, stream); err != nil {
return nil, err
}
return c.readNamespacedShares(ctx, stream)
}

func (c *Client) readStatus(ctx context.Context, stream network.Stream) error {
var resp pb.GetSharesByNamespaceStatusResponse
_, err := serde.Read(stream, &resp)
if err != nil {
// server is overloaded and closed the stream
if errors.Is(err, io.EOF) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited)
return nil, p2p.ErrNotFound
return p2p.ErrRateLimited
}
c.metrics.ObserveRequests(ctx, 1, p2p.StatusReadRespErr)
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("client-nd: reading response: %w", err)
return fmt.Errorf("client-nd: reading status response: %w", err)
}

return c.convertResponse(ctx, resp)
return c.convertStatusToErr(ctx, resp.Status)
}

// convertToNamespacedShares converts proto Rows to share.NamespacedShares
func convertToNamespacedShares(rows []*pb.Row) share.NamespacedShares {
shares := make([]share.NamespacedRow, 0, len(rows))
for _, row := range rows {
var proof *nmt.Proof
// readNamespacedShares converts proto Rows to share.NamespacedShares
func (c *Client) readNamespacedShares(
ctx context.Context,
stream network.Stream,
) (share.NamespacedShares, error) {
var shares share.NamespacedShares
for {
var row pb.NamespaceRowResponse
_, err := serde.Read(stream, &row)
if err != nil {
if errors.Is(err, io.EOF) {
// all data is received and steam is closed by server
return shares, nil
}
c.metrics.ObserveRequests(ctx, 1, p2p.StatusReadRespErr)
return nil, err
}
var proof nmt.Proof
if row.Proof != nil {
tmpProof := nmt.NewInclusionProof(
int(row.Proof.Start),
int(row.Proof.End),
row.Proof.Nodes,
row.Proof.IsMaxNamespaceIgnored,
)
proof = &tmpProof
if len(row.Shares) != 0 {
proof = nmt.NewInclusionProof(
int(row.Proof.Start),
int(row.Proof.End),
row.Proof.Nodes,
row.Proof.IsMaxNamespaceIgnored,
)
} else {
proof = nmt.NewAbsenceProof(
int(row.Proof.Start),
int(row.Proof.End),
row.Proof.Nodes,
row.Proof.LeafHash,
row.Proof.IsMaxNamespaceIgnored,
)
}
}

shares = append(shares, share.NamespacedRow{
Shares: row.Shares,
Proof: proof,
})
}
return shares
}

func convertToNonInclusionProofs(rows []*pb.Row) share.NamespacedShares {
shares := make([]share.NamespacedRow, 0, len(rows))
for _, row := range rows {
proof := nmt.NewAbsenceProof(
int(row.Proof.Start),
int(row.Proof.End),
row.Proof.Nodes,
row.Proof.LeafHash,
row.Proof.IsMaxNamespaceIgnored,
)
shares = append(shares, share.NamespacedRow{
Proof: &proof,
Proof: &proof,
})
}
return shares
}

func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) {
Expand Down Expand Up @@ -192,23 +204,20 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream)
}
}

func (c *Client) convertResponse(
ctx context.Context, resp pb.GetSharesByNamespaceResponse) (share.NamespacedShares, error) {
switch resp.Status {
func (c *Client) convertStatusToErr(ctx context.Context, status pb.StatusCode) error {
switch status {
case pb.StatusCode_OK:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
return convertToNamespacedShares(resp.Rows), nil
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return convertToNonInclusionProofs(resp.Rows), nil
return nil
case pb.StatusCode_NOT_FOUND:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return nil, p2p.ErrNotFound
return p2p.ErrNotFound
case pb.StatusCode_INVALID:
log.Debug("client-nd: invalid request")
log.Warn("client-nd: invalid request")
fallthrough
case pb.StatusCode_INTERNAL:
fallthrough
default:
return nil, p2p.ErrInvalidResponse
return p2p.ErrInvalidResponse
}
}
2 changes: 1 addition & 1 deletion share/p2p/shrexnd/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestExchange_RequestND(t *testing.T) {
// wait until all server slots are taken
wg.Wait()
_, err = client.RequestND(ctx, nil, sharetest.RandV0Namespace(), server.host.ID())
require.ErrorIs(t, err, p2p.ErrNotFound)
require.ErrorIs(t, err, p2p.ErrRateLimited)
})
}

Expand Down
Loading