diff --git a/libs/utils/close.go b/libs/utils/close.go new file mode 100644 index 0000000000..cbcc7b6f67 --- /dev/null +++ b/libs/utils/close.go @@ -0,0 +1,13 @@ +package utils + +import ( + "io" + + logging "github.com/ipfs/go-log/v2" +) + +func CloseAndLog(log logging.StandardLogger, name string, closer io.Closer) { + if err := closer.Close(); err != nil { + log.Warnf("closing %s: %s", name, err) + } +} diff --git a/share/getters/utils.go b/libs/utils/ctx.go similarity index 53% rename from share/getters/utils.go rename to libs/utils/ctx.go index 2260183b4f..5a13232a16 100644 --- a/share/getters/utils.go +++ b/libs/utils/ctx.go @@ -1,24 +1,22 @@ -package getters +package utils import ( "context" - "errors" "time" - - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel" ) -var ( - tracer = otel.Tracer("share/getters") - log = logging.Logger("share/getters") +// ResetContextOnError returns a fresh context if the given context has an error. +func ResetContextOnError(ctx context.Context) context.Context { + if ctx.Err() != nil { + ctx = context.Background() + } - errOperationNotSupported = errors.New("operation is not supported") -) + return ctx +} -// ctxWithSplitTimeout will split timeout stored in context by splitFactor and return the result if +// CtxWithSplitTimeout will split timeout stored in context by splitFactor and return the result if // it is greater than minTimeout. minTimeout == 0 will be ignored, splitFactor <= 0 will be ignored -func ctxWithSplitTimeout( +func CtxWithSplitTimeout( ctx context.Context, splitFactor int, minTimeout time.Duration, @@ -42,16 +40,3 @@ func ctxWithSplitTimeout( } return context.WithTimeout(ctx, splitTimeout) } - -// ErrorContains reports whether any error in err's tree matches any error in targets tree. -func ErrorContains(err, target error) bool { - if errors.Is(err, target) || target == nil { - return true - } - - target = errors.Unwrap(target) - if target == nil { - return false - } - return ErrorContains(err, target) -} diff --git a/share/getters/utils_test.go b/libs/utils/ctx_test.go similarity index 62% rename from share/getters/utils_test.go rename to libs/utils/ctx_test.go index ce94d3ac04..c1fccfa48a 100644 --- a/share/getters/utils_test.go +++ b/libs/utils/ctx_test.go @@ -1,119 +1,13 @@ -package getters +package utils import ( "context" - "errors" - "fmt" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func Test_ErrorContains(t *testing.T) { - err1 := errors.New("1") - err2 := errors.New("2") - - w1 := func(err error) error { - return fmt.Errorf("wrap1: %w", err) - } - w2 := func(err error) error { - return fmt.Errorf("wrap1: %w", err) - } - - type args struct { - err error - target error - } - tests := []struct { - name string - args args - want bool - }{ - { - "nil err", - args{ - err: nil, - target: err1, - }, - false, - }, - { - "nil target", - args{ - err: err1, - target: nil, - }, - true, - }, - { - "errors.Is true", - args{ - err: w1(err1), - target: err1, - }, - true, - }, - { - "errors.Is false", - args{ - err: w1(err1), - target: err2, - }, - false, - }, - { - "same wrap but different base error", - args{ - err: w1(err1), - target: w1(err2), - }, - false, - }, - { - "both wrapped true", - args{ - err: w1(err1), - target: w2(err1), - }, - true, - }, - { - "both wrapped false", - args{ - err: w1(err1), - target: w2(err2), - }, - false, - }, - { - "multierr first in slice", - args{ - err: errors.Join(w1(err1), w2(err2)), - target: w2(err1), - }, - true, - }, - { - "multierr second in slice", - args{ - err: errors.Join(w1(err1), w2(err2)), - target: w1(err2), - }, - true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, - tt.want, - ErrorContains(tt.args.err, tt.args.target), - "ErrorContains(%v, %v)", tt.args.err, tt.args.target) - }) - } -} - func Test_ctxWithSplitTimeout(t *testing.T) { type args struct { ctxTimeout time.Duration @@ -216,7 +110,7 @@ func Test_ctxWithSplitTimeout(t *testing.T) { ctx, cancel = context.WithTimeout(ctx, tt.args.ctxTimeout) } t.Cleanup(cancel) - got, _ := ctxWithSplitTimeout(ctx, sf, tt.args.minTimeout) + got, _ := CtxWithSplitTimeout(ctx, sf, tt.args.minTimeout) dl, ok := got.Deadline() // in case no deadline is found in ctx or not expected to be found, check both cases apply at the // same time diff --git a/libs/utils/error.go b/libs/utils/error.go new file mode 100644 index 0000000000..0c64cb4fe8 --- /dev/null +++ b/libs/utils/error.go @@ -0,0 +1,16 @@ +package utils + +import "errors" + +// ErrorContains reports whether any error in err's tree matches any error in targets tree. +func ErrorContains(err, target error) bool { + if errors.Is(err, target) || target == nil { + return true + } + + target = errors.Unwrap(target) + if target == nil { + return false + } + return ErrorContains(err, target) +} diff --git a/libs/utils/error_test.go b/libs/utils/error_test.go new file mode 100644 index 0000000000..8fe1711776 --- /dev/null +++ b/libs/utils/error_test.go @@ -0,0 +1,112 @@ +package utils + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_ErrorContains(t *testing.T) { + err1 := errors.New("1") + err2 := errors.New("2") + + w1 := func(err error) error { + return fmt.Errorf("wrap1: %w", err) + } + w2 := func(err error) error { + return fmt.Errorf("wrap1: %w", err) + } + + type args struct { + err error + target error + } + tests := []struct { + name string + args args + want bool + }{ + { + "nil err", + args{ + err: nil, + target: err1, + }, + false, + }, + { + "nil target", + args{ + err: err1, + target: nil, + }, + true, + }, + { + "errors.Is true", + args{ + err: w1(err1), + target: err1, + }, + true, + }, + { + "errors.Is false", + args{ + err: w1(err1), + target: err2, + }, + false, + }, + { + "same wrap but different base error", + args{ + err: w1(err1), + target: w1(err2), + }, + false, + }, + { + "both wrapped true", + args{ + err: w1(err1), + target: w2(err1), + }, + true, + }, + { + "both wrapped false", + args{ + err: w1(err1), + target: w2(err2), + }, + false, + }, + { + "multierr first in slice", + args{ + err: errors.Join(w1(err1), w2(err2)), + target: w2(err1), + }, + true, + }, + { + "multierr second in slice", + args{ + err: errors.Join(w1(err1), w2(err2)), + target: w1(err2), + }, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, + tt.want, + ErrorContains(tt.args.err, tt.args.target), + "ErrorContains(%v, %v)", tt.args.err, tt.args.target) + }) + } +} diff --git a/libs/utils/resetctx.go b/libs/utils/resetctx.go deleted file mode 100644 index a108cc27b4..0000000000 --- a/libs/utils/resetctx.go +++ /dev/null @@ -1,14 +0,0 @@ -package utils - -import ( - "context" -) - -// ResetContextOnError returns a fresh context if the given context has an error. -func ResetContextOnError(ctx context.Context) context.Context { - if ctx.Err() != nil { - ctx = context.Background() - } - - return ctx -} diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 9c3bf7e646..d892120541 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -11,6 +11,7 @@ import ( "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/ipld" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/getter" ) func newShareModule(getter share.Getter, avail share.Availability) Module { @@ -42,7 +43,7 @@ func ensureEmptyEDSInBS(ctx context.Context, bServ blockservice.BlockService) er } func lightGetter( - shrexGetter *getters.ShrexGetter, + shrexGetter *getter.ShrexGetter, ipldGetter *getters.IPLDGetter, cfg Config, ) share.Getter { @@ -60,7 +61,7 @@ func lightGetter( // manual after corruption is detected). func bridgeGetter( storeGetter *getters.StoreGetter, - shrexGetter *getters.ShrexGetter, + shrexGetter *getter.ShrexGetter, cfg Config, ) share.Getter { var cascade []share.Getter @@ -73,7 +74,7 @@ func bridgeGetter( func fullGetter( storeGetter *getters.StoreGetter, - shrexGetter *getters.ShrexGetter, + shrexGetter *getter.ShrexGetter, ipldGetter *getters.IPLDGetter, cfg Config, ) share.Getter { diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index 5d0dbbd096..aa93a84410 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -19,6 +19,7 @@ import ( "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/getter" ) func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option { @@ -92,8 +93,8 @@ func shrexComponents(tp node.Type, cfg *Config) fx.Option { edsClient *shrexeds.Client, ndClient *shrexnd.Client, managers map[string]*peers.Manager, - ) *getters.ShrexGetter { - return getters.NewShrexGetter( + ) *getter.ShrexGetter { + return getter.NewShrexGetter( edsClient, ndClient, managers[fullNodesTag], @@ -101,10 +102,10 @@ func shrexComponents(tp node.Type, cfg *Config) fx.Option { lightprune.Window, ) }, - fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error { + fx.OnStart(func(ctx context.Context, getter *getter.ShrexGetter) error { return getter.Start(ctx) }), - fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error { + fx.OnStop(func(ctx context.Context, getter *getter.ShrexGetter) error { return getter.Stop(ctx) }), )), diff --git a/nodebuilder/share/opts.go b/nodebuilder/share/opts.go index 9c122b7b0f..773c59af17 100644 --- a/nodebuilder/share/opts.go +++ b/nodebuilder/share/opts.go @@ -4,11 +4,11 @@ import ( "errors" "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/getters" disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/getter" ) // WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is @@ -49,7 +49,7 @@ func WithShrexServerMetrics(edsServer *shrexeds.Server, ndServer *shrexnd.Server return ndServer.WithMetrics() } -func WithShrexGetterMetrics(sg *getters.ShrexGetter) error { +func WithShrexGetterMetrics(sg *getter.ShrexGetter) error { return sg.WithMetrics() } diff --git a/nodebuilder/tests/nd_test.go b/nodebuilder/tests/nd_test.go index 338aa6d0c1..7515205c7d 100644 --- a/nodebuilder/tests/nd_test.go +++ b/nodebuilder/tests/nd_test.go @@ -20,6 +20,7 @@ import ( "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/getter" ) func TestShrexNDFromLights(t *testing.T) { @@ -200,7 +201,7 @@ func replaceShareGetter() fx.Option { host host.Host, store *eds.Store, storeGetter *getters.StoreGetter, - shrexGetter *getters.ShrexGetter, + shrexGetter *getter.ShrexGetter, network p2p.Network, ) share.Getter { cascade := make([]share.Getter, 0, 2) diff --git a/nodebuilder/tests/prune_test.go b/nodebuilder/tests/prune_test.go index 15b939f465..42a19b717a 100644 --- a/nodebuilder/tests/prune_test.go +++ b/nodebuilder/tests/prune_test.go @@ -19,10 +19,10 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp" "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/getter" ) // TestArchivalBlobSync tests whether a LN is able to sync historical blobs from @@ -71,15 +71,15 @@ func TestArchivalBlobSync(t *testing.T) { edsClient *shrexeds.Client, ndClient *shrexnd.Client, managers map[string]*peers.Manager, - ) *getters.ShrexGetter { - return getters.NewShrexGetter( + ) *getter.ShrexGetter { + return getter.NewShrexGetter( edsClient, ndClient, managers["full"], managers["archival"], testAvailWindow, ) - }, new(getters.ShrexGetter)), + }, new(getter.ShrexGetter)), ) // stop the archival BN to force LN to have to discover diff --git a/share/getters/cascade.go b/share/getters/cascade.go index 3875127580..574b70153c 100644 --- a/share/getters/cascade.go +++ b/share/getters/cascade.go @@ -4,6 +4,8 @@ import ( "context" "errors" + logging "github.com/ipfs/go-log/v2" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -15,6 +17,11 @@ import ( "github.com/celestiaorg/celestia-node/share/eds/byzantine" ) +var ( + tracer = otel.Tracer("share/getters") + log = logging.Logger("share/getters") +) + var _ share.Getter = (*CascadeGetter)(nil) // CascadeGetter implements custom share.Getter that composes multiple Getter implementations in @@ -125,14 +132,14 @@ func cascadeGetters[V any]( // we split the timeout between left getters // once async cascadegetter is implemented, we can remove this - getCtx, cancel := ctxWithSplitTimeout(ctx, len(getters)-i, 0) + getCtx, cancel := utils.CtxWithSplitTimeout(ctx, len(getters)-i, 0) val, getErr := get(getCtx, getter) cancel() if getErr == nil { return val, nil } - if errors.Is(getErr, errOperationNotSupported) { + if errors.Is(getErr, ErrOperationNotSupported) { continue } diff --git a/share/shwap/eds_id.go b/share/shwap/eds_id.go index 5ef37744ef..0709066182 100644 --- a/share/shwap/eds_id.go +++ b/share/shwap/eds_id.go @@ -12,7 +12,6 @@ const EdsIDSize = 8 // ErrOutOfBounds is returned whenever an index is out of bounds. var ( ErrInvalidShwapID = errors.New("invalid shwap ID") - ErrOutOfBounds = fmt.Errorf("index out of bounds: %w", ErrInvalidShwapID) ) // EdsID represents a unique identifier for a row, using the height of the block diff --git a/share/shwap/getter.go b/share/shwap/getter.go new file mode 100644 index 0000000000..334a2e356c --- /dev/null +++ b/share/shwap/getter.go @@ -0,0 +1,42 @@ +package shwap + +import ( + "context" + "errors" + "fmt" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/share" +) + +var ( + // ErrOperationNotSupported is used to indicate that the operation is not supported by the + // implementation of the getter interface. + ErrOperationNotSupported = errors.New("operation is not supported") + // ErrNotFound is used to indicate that requested data could not be found. + ErrNotFound = errors.New("data not found") + // ErrOutOfBounds is used to indicate that a passed row or column index is out of bounds of the + // square size. + ErrOutOfBounds = fmt.Errorf("index out of bounds: %w", ErrInvalidShwapID) +) + +// Getter interface provides a set of accessors for shares by the Root. +// Automatically verifies integrity of shares(exceptions possible depending on the implementation). +// +//go:generate mockgen -destination=mocks/getter.go -package=mocks . Getter +type Getter interface { + // GetShare gets a Share by coordinates in EDS. + GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (share.Share, error) + + // GetEDS gets the full EDS identified by the given extended header. + GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) + + // GetSharesByNamespace gets all shares from an EDS within the given namespace. + // Shares are returned in a row-by-row order if the namespace spans multiple rows. + // Inclusion of returned data could be verified using Verify method on NamespacedShares. + // If no shares are found for target namespace non-inclusion could be also verified by calling + // Verify method. + GetSharesByNamespace(context.Context, *header.ExtendedHeader, share.Namespace) (NamespacedData, error) +} diff --git a/share/p2p/discovery/backoff.go b/share/shwap/p2p/discovery/backoff.go similarity index 100% rename from share/p2p/discovery/backoff.go rename to share/shwap/p2p/discovery/backoff.go diff --git a/share/p2p/discovery/backoff_test.go b/share/shwap/p2p/discovery/backoff_test.go similarity index 100% rename from share/p2p/discovery/backoff_test.go rename to share/shwap/p2p/discovery/backoff_test.go diff --git a/share/p2p/discovery/discovery.go b/share/shwap/p2p/discovery/discovery.go similarity index 100% rename from share/p2p/discovery/discovery.go rename to share/shwap/p2p/discovery/discovery.go diff --git a/share/p2p/discovery/discovery_test.go b/share/shwap/p2p/discovery/discovery_test.go similarity index 100% rename from share/p2p/discovery/discovery_test.go rename to share/shwap/p2p/discovery/discovery_test.go diff --git a/share/p2p/discovery/metrics.go b/share/shwap/p2p/discovery/metrics.go similarity index 100% rename from share/p2p/discovery/metrics.go rename to share/shwap/p2p/discovery/metrics.go diff --git a/share/p2p/discovery/options.go b/share/shwap/p2p/discovery/options.go similarity index 100% rename from share/p2p/discovery/options.go rename to share/shwap/p2p/discovery/options.go diff --git a/share/p2p/discovery/set.go b/share/shwap/p2p/discovery/set.go similarity index 100% rename from share/p2p/discovery/set.go rename to share/shwap/p2p/discovery/set.go diff --git a/share/p2p/discovery/set_test.go b/share/shwap/p2p/discovery/set_test.go similarity index 100% rename from share/p2p/discovery/set_test.go rename to share/shwap/p2p/discovery/set_test.go diff --git a/share/p2p/doc.go b/share/shwap/p2p/shrex/doc.go similarity index 88% rename from share/p2p/doc.go rename to share/shwap/p2p/shrex/doc.go index 991ddf94db..9654532842 100644 --- a/share/p2p/doc.go +++ b/share/shwap/p2p/shrex/doc.go @@ -1,4 +1,4 @@ -// Package p2p provides p2p functionality that powers the share exchange protocols used by celestia-node. +// Package shrex provides functionality that powers the share exchange protocols used by celestia-node. // The available protocols are: // // - shrexsub : a floodsub-based pubsub protocol that is used to broadcast/subscribe to the event @@ -15,4 +15,4 @@ // and is primarily used by `getters.ShrexGetter` in share/getters/shrex.go. // // Find out more about each protocol in their respective sub-packages. -package p2p +package shrex diff --git a/share/p2p/errors.go b/share/shwap/p2p/shrex/errors.go similarity index 98% rename from share/p2p/errors.go rename to share/shwap/p2p/shrex/errors.go index cb7b596f47..79ff0ed2b2 100644 --- a/share/p2p/errors.go +++ b/share/shwap/p2p/shrex/errors.go @@ -1,4 +1,4 @@ -package p2p +package shrex import ( "errors" diff --git a/share/getters/shrex.go b/share/shwap/p2p/shrex/getter/shrex.go similarity index 82% rename from share/getters/shrex.go rename to share/shwap/p2p/shrex/getter/shrex.go index 09601d60de..92040b77ef 100644 --- a/share/getters/shrex.go +++ b/share/shwap/p2p/shrex/getter/shrex.go @@ -1,4 +1,4 @@ -package getters +package getter import ( "context" @@ -6,6 +6,7 @@ import ( "fmt" "time" + logging "github.com/ipfs/go-log/v2" libpeer "github.com/libp2p/go-libp2p/core/peer" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -18,14 +19,20 @@ import ( "github.com/celestiaorg/celestia-node/libs/utils" "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/p2p" - "github.com/celestiaorg/celestia-node/share/p2p/peers" - "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" - "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexnd" ) -var _ share.Getter = (*ShrexGetter)(nil) +var ( + tracer = otel.Tracer("shrex/getter") + meter = otel.Meter("shrex/getter") + log = logging.Logger("shrex/getter") +) + +var _ shwap.Getter = (*ShrexGetter)(nil) const ( // defaultMinRequestTimeout value is set according to observed time taken by healthy peer to @@ -34,8 +41,6 @@ const ( defaultMinAttemptsCount = 3 ) -var meter = otel.Meter("shrex/getter") - type metrics struct { edsAttempts metric.Int64Histogram ndAttempts metric.Int64Histogram @@ -141,7 +146,7 @@ func (sg *ShrexGetter) Stop(ctx context.Context) error { } func (sg *ShrexGetter) GetShare(context.Context, *header.ExtendedHeader, int, int) (share.Share, error) { - return nil, fmt.Errorf("getter/shrex: GetShare %w", errOperationNotSupported) + return nil, fmt.Errorf("getter/shrex: GetShare %w", shwap.ErrOperationNotSupported) } func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { @@ -176,8 +181,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader } reqStart := time.Now() - reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout) - eds, getErr := sg.edsClient.RequestEDS(reqCtx, header.DAH.Hash(), peer) + reqCtx, cancel := utils.CtxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout) + eds, getErr := sg.edsClient.RequestEDS(reqCtx, header.DAH, header.Height(), peer) cancel() switch { case getErr == nil: @@ -187,16 +192,16 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): setStatus(peers.ResultCooldownPeer) - case errors.Is(getErr, p2p.ErrNotFound): - getErr = share.ErrNotFound + case errors.Is(getErr, shrex.ErrNotFound): + getErr = shwap.ErrNotFound setStatus(peers.ResultCooldownPeer) - case errors.Is(getErr, p2p.ErrInvalidResponse): + case errors.Is(getErr, shrex.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: setStatus(peers.ResultCooldownPeer) } - if !ErrorContains(err, getErr) { + if !utils.ErrorContains(err, getErr) { err = errors.Join(err, getErr) } log.Debugw("eds: request failed", @@ -212,7 +217,7 @@ func (sg *ShrexGetter) GetSharesByNamespace( ctx context.Context, header *header.ExtendedHeader, namespace share.Namespace, -) (share.NamespacedShares, error) { +) (shwap.NamespacedData, error) { if err := namespace.ValidateForData(); err != nil { return nil, err } @@ -229,9 +234,9 @@ func (sg *ShrexGetter) GetSharesByNamespace( // verify that the namespace could exist inside the roots before starting network requests dah := header.DAH - roots := ipld.FilterRootByNamespace(dah, namespace) - if len(roots) == 0 { - return []share.NamespacedRow{}, nil + rowIdxs := share.RowsWithNamespace(dah, namespace) + if len(rowIdxs) == 0 { + return shwap.NamespacedData{}, nil } for { @@ -254,13 +259,13 @@ func (sg *ShrexGetter) GetSharesByNamespace( } reqStart := time.Now() - reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout) - nd, getErr := sg.ndClient.RequestND(reqCtx, dah, namespace, peer) + reqCtx, cancel := utils.CtxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout) + nd, getErr := sg.ndClient.RequestND(reqCtx, header.Height(), namespace, peer) cancel() switch { case getErr == nil: // both inclusion and non-inclusion cases needs verification - if verErr := nd.Verify(dah, namespace); verErr != nil { + if verErr := nd.Validate(dah, namespace); verErr != nil { getErr = verErr setStatus(peers.ResultBlacklistPeer) break @@ -271,16 +276,16 @@ func (sg *ShrexGetter) GetSharesByNamespace( case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): setStatus(peers.ResultCooldownPeer) - case errors.Is(getErr, p2p.ErrNotFound): - getErr = share.ErrNotFound + case errors.Is(getErr, shrex.ErrNotFound): + getErr = shwap.ErrNotFound setStatus(peers.ResultCooldownPeer) - case errors.Is(getErr, p2p.ErrInvalidResponse): + case errors.Is(getErr, shrex.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: setStatus(peers.ResultCooldownPeer) } - if !ErrorContains(err, getErr) { + if !utils.ErrorContains(err, getErr) { err = errors.Join(err, getErr) } log.Debugw("nd: request failed", diff --git a/share/getters/shrex_test.go b/share/shwap/p2p/shrex/getter/shrex_test.go similarity index 83% rename from share/getters/shrex_test.go rename to share/shwap/p2p/shrex/getter/shrex_test.go index a74fd8b890..129437e369 100644 --- a/share/getters/shrex_test.go +++ b/share/shwap/p2p/shrex/getter/shrex_test.go @@ -1,4 +1,4 @@ -package getters +package getter import ( "context" @@ -14,6 +14,7 @@ import ( mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/nmt" @@ -24,14 +25,14 @@ import ( "github.com/celestiaorg/celestia-node/pruner/full" "github.com/celestiaorg/celestia-node/pruner/light" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/p2p/peers" - "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" - "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" - "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" "github.com/celestiaorg/celestia-node/share/sharetest" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexnd" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" + "github.com/celestiaorg/celestia-node/store" ) func TestShrexGetter(t *testing.T) { @@ -46,8 +47,6 @@ func TestShrexGetter(t *testing.T) { // launch eds store and put test data into it edsStore, err := newStore(t) require.NoError(t, err) - err = edsStore.Start(ctx) - require.NoError(t, err) ndClient, _ := newNDClientServer(ctx, t, edsStore, srvHost, clHost) edsClient, _ := newEDSClientServer(ctx, t, edsStore, srvHost, clHost) @@ -63,6 +62,7 @@ func TestShrexGetter(t *testing.T) { getter := NewShrexGetter(edsClient, ndClient, fullPeerManager, archivalPeerManager, light.Window) require.NoError(t, getter.Start(ctx)) + height := atomic.NewUint64(1) t.Run("ND_Available, total data size > 1mb", func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second*10) t.Cleanup(cancel) @@ -70,17 +70,21 @@ func TestShrexGetter(t *testing.T) { // generate test data size := 64 namespace := sharetest.RandV0Namespace() + height := height.Add(1) randEDS, roots := edstest.RandEDSWithNamespace(t, namespace, size*size, size) eh := headertest.RandExtendedHeaderWithRoot(t, roots) - require.NoError(t, edsStore.Put(ctx, roots.Hash(), randEDS)) + eh.RawHeader.Height = int64(height) + + err = edsStore.Put(ctx, roots, height, randEDS) + require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), - Height: 1, + Height: height, }) got, err := getter.GetSharesByNamespace(ctx, eh, namespace) require.NoError(t, err) - require.NoError(t, got.Verify(roots, namespace)) + require.NoError(t, got.Validate(roots, namespace)) }) t.Run("ND_err_not_found", func(t *testing.T) { @@ -88,15 +92,18 @@ func TestShrexGetter(t *testing.T) { t.Cleanup(cancel) // generate test data + height := height.Add(1) _, roots, namespace := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, roots) + eh.RawHeader.Height = int64(height) + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), - Height: 1, + Height: height, }) _, err := getter.GetSharesByNamespace(ctx, eh, namespace) - require.ErrorIs(t, err, share.ErrNotFound) + require.ErrorIs(t, err, shwap.ErrNotFound) }) t.Run("ND_namespace_not_included", func(t *testing.T) { @@ -104,37 +111,41 @@ func TestShrexGetter(t *testing.T) { t.Cleanup(cancel) // generate test data + height := height.Add(1) eds, roots, maxNamespace := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, roots) - require.NoError(t, edsStore.Put(ctx, roots.Hash(), eds)) + eh.RawHeader.Height = int64(height) + + err = edsStore.Put(ctx, roots, height, eds) + require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), - Height: 1, + Height: height, }) // namespace inside root range nID, err := addToNamespace(maxNamespace, -1) require.NoError(t, err) // check for namespace to be between max and min namespace in root - require.Len(t, ipld.FilterRootByNamespace(roots, nID), 1) + require.Len(t, share.RowsWithNamespace(roots, nID), 1) emptyShares, err := getter.GetSharesByNamespace(ctx, eh, nID) require.NoError(t, err) // no shares should be returned require.Nil(t, emptyShares.Flatten()) - require.Nil(t, emptyShares.Verify(roots, nID)) + require.Nil(t, emptyShares.Validate(roots, nID)) // namespace outside root range nID, err = addToNamespace(maxNamespace, 1) require.NoError(t, err) // check for namespace to be not in root - require.Len(t, ipld.FilterRootByNamespace(roots, nID), 0) + require.Len(t, share.RowsWithNamespace(roots, nID), 0) emptyShares, err = getter.GetSharesByNamespace(ctx, eh, nID) require.NoError(t, err) // no shares should be returned require.Nil(t, emptyShares.Flatten()) - require.Nil(t, emptyShares.Verify(roots, nID)) + require.Nil(t, emptyShares.Validate(roots, nID)) }) t.Run("ND_namespace_not_in_dah", func(t *testing.T) { @@ -143,23 +154,27 @@ func TestShrexGetter(t *testing.T) { // generate test data eds, roots, maxNamespace := generateTestEDS(t) + height := height.Add(1) eh := headertest.RandExtendedHeaderWithRoot(t, roots) - require.NoError(t, edsStore.Put(ctx, roots.Hash(), eds)) + eh.RawHeader.Height = int64(height) + + err = edsStore.Put(ctx, roots, height, eds) + require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), - Height: 1, + Height: height, }) namespace, err := addToNamespace(maxNamespace, 1) require.NoError(t, err) // check for namespace to be not in root - require.Len(t, ipld.FilterRootByNamespace(roots, namespace), 0) + require.Len(t, share.RowsWithNamespace(roots, namespace), 0) emptyShares, err := getter.GetSharesByNamespace(ctx, eh, namespace) require.NoError(t, err) // no shares should be returned require.Empty(t, emptyShares.Flatten()) - require.Nil(t, emptyShares.Verify(roots, namespace)) + require.Nil(t, emptyShares.Validate(roots, namespace)) }) t.Run("EDS_Available", func(t *testing.T) { @@ -168,11 +183,15 @@ func TestShrexGetter(t *testing.T) { // generate test data randEDS, roots, _ := generateTestEDS(t) + height := height.Add(1) eh := headertest.RandExtendedHeaderWithRoot(t, roots) - require.NoError(t, edsStore.Put(ctx, roots.Hash(), randEDS)) + eh.RawHeader.Height = int64(height) + + err = edsStore.Put(ctx, roots, height, randEDS) + require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), - Height: 1, + Height: height, }) got, err := getter.GetEDS(ctx, eh) @@ -185,10 +204,13 @@ func TestShrexGetter(t *testing.T) { // generate test data _, roots, _ := generateTestEDS(t) + height := height.Add(1) eh := headertest.RandExtendedHeaderWithRoot(t, roots) + eh.RawHeader.Height = int64(height) + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), - Height: 1, + Height: height, }) cancel() @@ -202,14 +224,17 @@ func TestShrexGetter(t *testing.T) { // generate test data _, roots, _ := generateTestEDS(t) + height := height.Add(1) eh := headertest.RandExtendedHeaderWithRoot(t, roots) + eh.RawHeader.Height = int64(height) + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), - Height: 1, + Height: height, }) _, err := getter.GetEDS(ctx, eh) - require.ErrorIs(t, err, share.ErrNotFound) + require.ErrorIs(t, err, shwap.ErrNotFound) }) // tests getPeer's ability to route requests based on whether @@ -226,7 +251,9 @@ func TestShrexGetter(t *testing.T) { getter.archivalPeerManager.UpdateNodePool(archivalPeer.ID(), true) getter.fullPeerManager.UpdateNodePool(fullPeer.ID(), true) + height := height.Add(1) eh := headertest.RandExtendedHeader(t) + eh.RawHeader.Height = int64(height) // historical data expects an archival peer eh.RawHeader.Time = time.Now().Add(-(time.Duration(full.Window) + time.Second)) @@ -242,11 +269,10 @@ func TestShrexGetter(t *testing.T) { }) } -func newStore(t *testing.T) (*eds.Store, error) { +func newStore(t *testing.T) (*store.Store, error) { t.Helper() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - return eds.NewStore(eds.DefaultParameters(), t.TempDir(), ds) + return store.NewStore(store.DefaultParameters(), t.TempDir()) } func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, *share.AxisRoots, share.Namespace) { @@ -280,7 +306,7 @@ func testManager( } func newNDClientServer( - ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host, + ctx context.Context, t *testing.T, edsStore *store.Store, srvHost, clHost host.Host, ) (*shrexnd.Client, *shrexnd.Server) { params := shrexnd.DefaultParameters() @@ -300,7 +326,7 @@ func newNDClientServer( } func newEDSClientServer( - ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host, + ctx context.Context, t *testing.T, edsStore *store.Store, srvHost, clHost host.Host, ) (*shrexeds.Client, *shrexeds.Server) { params := shrexeds.DefaultParameters() diff --git a/share/p2p/metrics.go b/share/shwap/p2p/shrex/metrics.go similarity index 99% rename from share/p2p/metrics.go rename to share/shwap/p2p/shrex/metrics.go index 55aefda81d..9d5c605139 100644 --- a/share/p2p/metrics.go +++ b/share/shwap/p2p/shrex/metrics.go @@ -1,4 +1,4 @@ -package p2p +package shrex import ( "context" diff --git a/share/p2p/middleware.go b/share/shwap/p2p/shrex/middleware.go similarity index 98% rename from share/p2p/middleware.go rename to share/shwap/p2p/shrex/middleware.go index df0a690af7..c53a996eec 100644 --- a/share/p2p/middleware.go +++ b/share/shwap/p2p/shrex/middleware.go @@ -1,4 +1,4 @@ -package p2p +package shrex import ( "sync/atomic" diff --git a/share/p2p/params.go b/share/shwap/p2p/shrex/params.go similarity index 99% rename from share/p2p/params.go rename to share/shwap/p2p/shrex/params.go index 6636e38fc5..f36221d548 100644 --- a/share/p2p/params.go +++ b/share/shwap/p2p/shrex/params.go @@ -1,4 +1,4 @@ -package p2p +package shrex import ( "fmt" diff --git a/share/p2p/peers/doc.go b/share/shwap/p2p/shrex/peers/doc.go similarity index 100% rename from share/p2p/peers/doc.go rename to share/shwap/p2p/shrex/peers/doc.go diff --git a/share/p2p/peers/manager.go b/share/shwap/p2p/shrex/peers/manager.go similarity index 99% rename from share/p2p/peers/manager.go rename to share/shwap/p2p/shrex/peers/manager.go index 0d2f6ac42b..857c5ee937 100644 --- a/share/p2p/peers/manager.go +++ b/share/shwap/p2p/shrex/peers/manager.go @@ -21,7 +21,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" ) const ( diff --git a/share/p2p/peers/manager_test.go b/share/shwap/p2p/shrex/peers/manager_test.go similarity index 99% rename from share/p2p/peers/manager_test.go rename to share/shwap/p2p/shrex/peers/manager_test.go index 2a465dc59a..390d368b02 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/shwap/p2p/shrex/peers/manager_test.go @@ -22,8 +22,8 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/p2p/discovery" - "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" ) func TestManager(t *testing.T) { @@ -76,6 +76,7 @@ func TestManager(t *testing.T) { t.Cleanup(cancel) // create headerSub mock + h := testHeader() headerSub := newSubLock(h, nil) diff --git a/share/p2p/peers/metrics.go b/share/shwap/p2p/shrex/peers/metrics.go similarity index 99% rename from share/p2p/peers/metrics.go rename to share/shwap/p2p/shrex/peers/metrics.go index b28b263127..d401d6a4fc 100644 --- a/share/p2p/peers/metrics.go +++ b/share/shwap/p2p/shrex/peers/metrics.go @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/celestiaorg/celestia-node/libs/utils" - "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" ) const ( diff --git a/share/p2p/peers/options.go b/share/shwap/p2p/shrex/peers/options.go similarity index 97% rename from share/p2p/peers/options.go rename to share/shwap/p2p/shrex/peers/options.go index 2970dd2465..e268550853 100644 --- a/share/p2p/peers/options.go +++ b/share/shwap/p2p/shrex/peers/options.go @@ -7,7 +7,7 @@ import ( libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" ) type Parameters struct { diff --git a/share/p2p/peers/pool.go b/share/shwap/p2p/shrex/peers/pool.go similarity index 100% rename from share/p2p/peers/pool.go rename to share/shwap/p2p/shrex/peers/pool.go diff --git a/share/p2p/peers/pool_test.go b/share/shwap/p2p/shrex/peers/pool_test.go similarity index 100% rename from share/p2p/peers/pool_test.go rename to share/shwap/p2p/shrex/peers/pool_test.go diff --git a/share/p2p/peers/timedqueue.go b/share/shwap/p2p/shrex/peers/timedqueue.go similarity index 100% rename from share/p2p/peers/timedqueue.go rename to share/shwap/p2p/shrex/peers/timedqueue.go diff --git a/share/p2p/peers/timedqueue_test.go b/share/shwap/p2p/shrex/peers/timedqueue_test.go similarity index 100% rename from share/p2p/peers/timedqueue_test.go rename to share/shwap/p2p/shrex/peers/timedqueue_test.go diff --git a/share/p2p/recovery.go b/share/shwap/p2p/shrex/recovery.go similarity index 96% rename from share/p2p/recovery.go rename to share/shwap/p2p/shrex/recovery.go index b214969399..67bcb98d73 100644 --- a/share/p2p/recovery.go +++ b/share/shwap/p2p/shrex/recovery.go @@ -1,4 +1,4 @@ -package p2p +package shrex import ( "fmt" diff --git a/share/p2p/shrexeds/client.go b/share/shwap/p2p/shrex/shrexeds/client.go similarity index 60% rename from share/p2p/shrexeds/client.go rename to share/shwap/p2p/shrex/shrexeds/client.go index d56e0e20f5..7dad597543 100644 --- a/share/p2p/shrexeds/client.go +++ b/share/shwap/p2p/shrex/shrexeds/client.go @@ -1,6 +1,7 @@ package shrexeds import ( + "bytes" "context" "errors" "fmt" @@ -17,9 +18,10 @@ import ( "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/p2p" - pb "github.com/celestiaorg/celestia-node/share/p2p/shrexeds/pb" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" + pb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds/pb" ) // Client is responsible for requesting EDSs for blocksync over the ShrEx/EDS protocol. @@ -28,7 +30,7 @@ type Client struct { protocolID protocol.ID host host.Host - metrics *p2p.Metrics + metrics *shrex.Metrics } // NewClient creates a new ShrEx/EDS client. @@ -40,23 +42,27 @@ func NewClient(params *Parameters, host host.Host) (*Client, error) { return &Client{ params: params, host: host, - protocolID: p2p.ProtocolID(params.NetworkID(), protocolString), + protocolID: shrex.ProtocolID(params.NetworkID(), protocolString), }, nil } // RequestEDS requests the ODS from the given peers and returns the EDS upon success. func (c *Client) RequestEDS( ctx context.Context, - dataHash share.DataHash, + root *share.AxisRoots, + height uint64, peer peer.ID, ) (*rsmt2d.ExtendedDataSquare, error) { - eds, err := c.doRequest(ctx, dataHash, peer) + eds, err := c.doRequest(ctx, root, height, peer) if err == nil { return eds, nil } - log.Debugw("client: eds request to peer failed", "peer", peer.String(), "hash", dataHash.String(), "error", err) + log.Debugw("client: eds request to peer failed", + "height", height, + "peer", peer.String(), + "error", err) if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusTimeout) return nil, err } // some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not @@ -64,14 +70,14 @@ func (c *Client) RequestEDS( var ne net.Error if errors.As(err, &ne) && ne.Timeout() { if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusTimeout) return nil, context.DeadlineExceeded } } - if !errors.Is(err, p2p.ErrNotFound) { + if !errors.Is(err, shrex.ErrNotFound) { log.Warnw("client: eds request to peer failed", "peer", peer.String(), - "hash", dataHash.String(), + "height", height, "err", err) } @@ -80,27 +86,37 @@ func (c *Client) RequestEDS( func (c *Client) doRequest( ctx context.Context, - dataHash share.DataHash, + root *share.AxisRoots, + height uint64, to peer.ID, ) (*rsmt2d.ExtendedDataSquare, error) { streamOpenCtx, cancel := context.WithTimeout(ctx, c.params.ServerReadTimeout) defer cancel() stream, err := c.host.NewStream(streamOpenCtx, to, c.protocolID) if err != nil { - return nil, fmt.Errorf("failed to open stream: %w", err) + return nil, fmt.Errorf("open stream: %w", err) } defer stream.Close() c.setStreamDeadlines(ctx, stream) - req := &pb.EDSRequest{Hash: dataHash} + req, err := shwap.NewEdsID(height) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + rb, err := req.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } // request ODS - log.Debugw("client: requesting ods", "hash", dataHash.String(), "peer", to.String()) - _, err = serde.Write(stream, req) + log.Debugw("client: requesting ods", + "height", height, + "peer", to.String()) + _, err = stream.Write(rb) if err != nil { stream.Reset() //nolint:errcheck - return nil, fmt.Errorf("failed to write request to stream: %w", err) + return nil, fmt.Errorf("write request to stream: %w", err) } err = stream.CloseWrite() if err != nil { @@ -117,36 +133,61 @@ func (c *Client) doRequest( if err != nil { // server closes the stream here if we are rate limited if errors.Is(err, io.EOF) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited) - return nil, p2p.ErrNotFound + c.metrics.ObserveRequests(ctx, 1, shrex.StatusRateLimited) + return nil, shrex.ErrNotFound } stream.Reset() //nolint:errcheck - return nil, fmt.Errorf("failed to read status from stream: %w", err) + return nil, fmt.Errorf("read status from stream: %w", err) } - switch resp.Status { case pb.Status_OK: // reset stream deadlines to original values, since read deadline was changed during status read c.setStreamDeadlines(ctx, stream) // use header and ODS bytes to construct EDS and verify it against dataHash - eds, err := eds.ReadEDS(ctx, stream, dataHash) + eds, err := readEds(ctx, stream, root) if err != nil { - return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err) + return nil, fmt.Errorf("read eds from stream: %w", err) } - c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusSuccess) return eds, nil case pb.Status_NOT_FOUND: - c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) - return nil, p2p.ErrNotFound + c.metrics.ObserveRequests(ctx, 1, shrex.StatusNotFound) + return nil, shrex.ErrNotFound case pb.Status_INVALID: log.Debug("client: invalid request") fallthrough case pb.Status_INTERNAL: fallthrough default: - c.metrics.ObserveRequests(ctx, 1, p2p.StatusInternalErr) - return nil, p2p.ErrInvalidResponse + c.metrics.ObserveRequests(ctx, 1, shrex.StatusInternalErr) + return nil, shrex.ErrInvalidResponse + } +} + +func readEds(ctx context.Context, stream network.Stream, root *share.AxisRoots) (*rsmt2d.ExtendedDataSquare, error) { + odsSize := len(root.RowRoots) / 2 + shares, err := eds.ReadShares(stream, share.Size, odsSize) + if err != nil { + return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err) + } + + // verify that the EDS hash matches the expected hash + rsmt2d, err := eds.Rsmt2DFromShares(shares, odsSize) + if err != nil { + return nil, fmt.Errorf("failed to create rsmt2d from shares: %w", err) + } + datahash, err := rsmt2d.DataHash(ctx) + if err != nil { + return nil, fmt.Errorf("failed to calculate data hash: %w", err) + } + if !bytes.Equal(datahash, root.Hash()) { + return nil, fmt.Errorf( + "content integrity mismatch: imported root %s doesn't match expected root %s", + datahash, + root.Hash(), + ) } + return rsmt2d.ExtendedDataSquare, nil } func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) { diff --git a/share/p2p/shrexeds/doc.go b/share/shwap/p2p/shrex/shrexeds/doc.go similarity index 100% rename from share/p2p/shrexeds/doc.go rename to share/shwap/p2p/shrex/shrexeds/doc.go diff --git a/share/p2p/shrexeds/exchange_test.go b/share/shwap/p2p/shrex/shrexeds/exchange_test.go similarity index 68% rename from share/p2p/shrexeds/exchange_test.go rename to share/shwap/p2p/shrex/shrexeds/exchange_test.go index 450ad291f0..54fa585a70 100644 --- a/share/p2p/shrexeds/exchange_test.go +++ b/share/shwap/p2p/shrex/shrexeds/exchange_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" libhost "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -15,9 +13,10 @@ import ( "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" - "github.com/celestiaorg/celestia-node/share/p2p" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" + "github.com/celestiaorg/celestia-node/store" ) func TestExchange_RequestEDS(t *testing.T) { @@ -25,10 +24,7 @@ func TestExchange_RequestEDS(t *testing.T) { t.Cleanup(cancel) store, client, server := makeExchange(t) - err := store.Start(ctx) - require.NoError(t, err) - - err = server.Start(ctx) + err := server.Start(ctx) require.NoError(t, err) // Testcase: EDS is immediately available @@ -36,10 +32,11 @@ func TestExchange_RequestEDS(t *testing.T) { eds := edstest.RandEDS(t, 4) roots, err := share.NewAxisRoots(eds) require.NoError(t, err) - err = store.Put(ctx, roots.Hash(), eds) + height := uint64(1) + err = store.Put(ctx, roots, height, eds) require.NoError(t, err) - requestedEDS, err := client.RequestEDS(ctx, roots.Hash(), server.host.ID()) + requestedEDS, err := client.RequestEDS(ctx, roots, height, server.host.ID()) assert.NoError(t, err) assert.Equal(t, eds.Flattened(), requestedEDS.Flattened()) }) @@ -49,17 +46,18 @@ func TestExchange_RequestEDS(t *testing.T) { eds := edstest.RandEDS(t, 4) roots, err := share.NewAxisRoots(eds) require.NoError(t, err) + height := uint64(666) lock := make(chan struct{}) go func() { <-lock - err = store.Put(ctx, roots.Hash(), eds) + err := store.Put(ctx, roots, height, eds) require.NoError(t, err) lock <- struct{}{} }() - requestedEDS, err := client.RequestEDS(ctx, roots.Hash(), server.host.ID()) - assert.ErrorIs(t, err, p2p.ErrNotFound) + requestedEDS, err := client.RequestEDS(ctx, roots, height, server.host.ID()) + assert.ErrorIs(t, err, shrex.ErrNotFound) assert.Nil(t, requestedEDS) // unlock write @@ -67,16 +65,17 @@ func TestExchange_RequestEDS(t *testing.T) { // wait for write to finish <-lock - requestedEDS, err = client.RequestEDS(ctx, roots.Hash(), server.host.ID()) + requestedEDS, err = client.RequestEDS(ctx, roots, height, server.host.ID()) assert.NoError(t, err) assert.Equal(t, eds.Flattened(), requestedEDS.Flattened()) }) // Testcase: Invalid request excludes peer from round-robin, stopping request t.Run("EDS_InvalidRequest", func(t *testing.T) { - dataHash := []byte("invalid") - requestedEDS, err := client.RequestEDS(ctx, dataHash, server.host.ID()) - assert.ErrorContains(t, err, "stream reset") + emptyRoot := share.EmptyEDSRoots() + height := uint64(0) + requestedEDS, err := client.RequestEDS(ctx, emptyRoot, height, server.host.ID()) + assert.ErrorIs(t, err, shwap.ErrInvalidShwapID) assert.Nil(t, requestedEDS) }) @@ -86,15 +85,15 @@ func TestExchange_RequestEDS(t *testing.T) { eds := edstest.RandEDS(t, 4) roots, err := share.NewAxisRoots(eds) require.NoError(t, err) - _, err = client.RequestEDS(timeoutCtx, roots.Hash(), server.host.ID()) - require.ErrorIs(t, err, p2p.ErrNotFound) + height := uint64(668) + _, err = client.RequestEDS(timeoutCtx, roots, height, server.host.ID()) + require.ErrorIs(t, err, shrex.ErrNotFound) }) // Testcase: Concurrency limit reached t.Run("EDS_concurrency_limit", func(t *testing.T) { - store, client, server := makeExchange(t) + _, client, server := makeExchange(t) - require.NoError(t, store.Start(ctx)) require.NoError(t, server.Start(ctx)) ctx, cancel := context.WithTimeout(ctx, time.Second) @@ -115,34 +114,25 @@ func TestExchange_RequestEDS(t *testing.T) { t.Fatal("timeout") } } - middleware := p2p.NewMiddleware(rateLimit) + middleware := shrex.NewMiddleware(rateLimit) server.host.SetStreamHandler(server.protocolID, middleware.RateLimitHandler(mockHandler)) // take server concurrency slots with blocked requests + emptyRoot := share.EmptyEDSRoots() for i := 0; i < rateLimit; i++ { go func(i int) { - client.RequestEDS(ctx, nil, server.host.ID()) //nolint:errcheck + client.RequestEDS(ctx, emptyRoot, 1, server.host.ID()) //nolint:errcheck }(i) } // wait until all server slots are taken wg.Wait() - _, err = client.RequestEDS(ctx, nil, server.host.ID()) - require.ErrorIs(t, err, p2p.ErrNotFound) + _, err = client.RequestEDS(ctx, emptyRoot, 1, server.host.ID()) + require.ErrorIs(t, err, shrex.ErrNotFound) }) } -func newStore(t *testing.T) *eds.Store { - t.Helper() - - storeCfg := eds.DefaultParameters() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - store, err := eds.NewStore(storeCfg, t.TempDir(), ds) - require.NoError(t, err) - return store -} - func createMocknet(t *testing.T, amount int) []libhost.Host { t.Helper() @@ -152,9 +142,10 @@ func createMocknet(t *testing.T, amount int) []libhost.Host { return net.Hosts() } -func makeExchange(t *testing.T) (*eds.Store, *Client, *Server) { +func makeExchange(t *testing.T) (*store.Store, *Client, *Server) { t.Helper() - store := newStore(t) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) hosts := createMocknet(t, 2) client, err := NewClient(DefaultParameters(), hosts[0]) diff --git a/share/p2p/shrexeds/params.go b/share/shwap/p2p/shrex/shrexeds/params.go similarity index 78% rename from share/p2p/shrexeds/params.go rename to share/shwap/p2p/shrex/shrexeds/params.go index 795cb313ed..4c3667033d 100644 --- a/share/p2p/shrexeds/params.go +++ b/share/shwap/p2p/shrex/shrexeds/params.go @@ -5,16 +5,16 @@ import ( logging "github.com/ipfs/go-log/v2" - "github.com/celestiaorg/celestia-node/share/p2p" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" ) -const protocolString = "/shrex/eds/v0.0.1" +const protocolString = "/shrex/eds/v0.0.2" var log = logging.Logger("shrex/eds") // Parameters is the set of parameters that must be configured for the shrex/eds protocol. type Parameters struct { - *p2p.Parameters + *shrex.Parameters // BufferSize defines the size of the buffer used for writing an ODS over the stream. BufferSize uint64 @@ -22,7 +22,7 @@ type Parameters struct { func DefaultParameters() *Parameters { return &Parameters{ - Parameters: p2p.DefaultParameters(), + Parameters: shrex.DefaultParameters(), BufferSize: 32 * 1024, } } @@ -36,7 +36,7 @@ func (p *Parameters) Validate() error { } func (c *Client) WithMetrics() error { - metrics, err := p2p.InitClientMetrics("eds") + metrics, err := shrex.InitClientMetrics("eds") if err != nil { return fmt.Errorf("shrex/eds: init Metrics: %w", err) } @@ -45,7 +45,7 @@ func (c *Client) WithMetrics() error { } func (s *Server) WithMetrics() error { - metrics, err := p2p.InitServerMetrics("eds") + metrics, err := shrex.InitServerMetrics("eds") if err != nil { return fmt.Errorf("shrex/eds: init Metrics: %w", err) } diff --git a/share/p2p/shrexeds/pb/extended_data_square.pb.go b/share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.pb.go similarity index 55% rename from share/p2p/shrexeds/pb/extended_data_square.pb.go rename to share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.pb.go index ed1a96ae3b..60d26abdc2 100644 --- a/share/p2p/shrexeds/pb/extended_data_square.pb.go +++ b/share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: share/p2p/shrexeds/pb/extended_data_square.proto +// source: share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.proto -package extended_data_square +package pb import ( fmt "fmt" @@ -50,51 +50,7 @@ func (x Status) String() string { } func (Status) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_49d42aa96098056e, []int{0} -} - -type EDSRequest struct { - Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` -} - -func (m *EDSRequest) Reset() { *m = EDSRequest{} } -func (m *EDSRequest) String() string { return proto.CompactTextString(m) } -func (*EDSRequest) ProtoMessage() {} -func (*EDSRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_49d42aa96098056e, []int{0} -} -func (m *EDSRequest) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *EDSRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_EDSRequest.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *EDSRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_EDSRequest.Merge(m, src) -} -func (m *EDSRequest) XXX_Size() int { - return m.Size() -} -func (m *EDSRequest) XXX_DiscardUnknown() { - xxx_messageInfo_EDSRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_EDSRequest proto.InternalMessageInfo - -func (m *EDSRequest) GetHash() []byte { - if m != nil { - return m.Hash - } - return nil + return fileDescriptor_5176f06f10cac3fd, []int{0} } type EDSResponse struct { @@ -105,7 +61,7 @@ func (m *EDSResponse) Reset() { *m = EDSResponse{} } func (m *EDSResponse) String() string { return proto.CompactTextString(m) } func (*EDSResponse) ProtoMessage() {} func (*EDSResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_49d42aa96098056e, []int{1} + return fileDescriptor_5176f06f10cac3fd, []int{0} } func (m *EDSResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -143,61 +99,31 @@ func (m *EDSResponse) GetStatus() Status { func init() { proto.RegisterEnum("Status", Status_name, Status_value) - proto.RegisterType((*EDSRequest)(nil), "EDSRequest") proto.RegisterType((*EDSResponse)(nil), "EDSResponse") } func init() { - proto.RegisterFile("share/p2p/shrexeds/pb/extended_data_square.proto", fileDescriptor_49d42aa96098056e) -} - -var fileDescriptor_49d42aa96098056e = []byte{ - // 227 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x28, 0xce, 0x48, 0x2c, - 0x4a, 0xd5, 0x2f, 0x30, 0x2a, 0xd0, 0x2f, 0xce, 0x28, 0x4a, 0xad, 0x48, 0x4d, 0x29, 0xd6, 0x2f, - 0x48, 0xd2, 0x4f, 0xad, 0x28, 0x49, 0xcd, 0x4b, 0x49, 0x4d, 0x89, 0x4f, 0x49, 0x2c, 0x49, 0x8c, - 0x2f, 0x2e, 0x2c, 0x4d, 0x2c, 0x4a, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0x52, 0xe0, 0xe2, - 0x72, 0x75, 0x09, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x12, 0xe2, 0x62, 0xc9, 0x48, - 0x2c, 0xce, 0x90, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0xb3, 0x95, 0xf4, 0xb8, 0xb8, 0xc1, - 0x2a, 0x8a, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0xe4, 0xb9, 0xd8, 0x8a, 0x4b, 0x12, 0x4b, 0x4a, - 0x8b, 0xc1, 0x8a, 0xf8, 0x8c, 0xd8, 0xf5, 0x82, 0xc1, 0xdc, 0x20, 0xa8, 0xb0, 0x96, 0x15, 0x17, - 0x1b, 0x44, 0x44, 0x88, 0x9b, 0x8b, 0xdd, 0xd3, 0x2f, 0xcc, 0xd1, 0xc7, 0xd3, 0x45, 0x80, 0x41, - 0x88, 0x8d, 0x8b, 0xc9, 0xdf, 0x5b, 0x80, 0x51, 0x88, 0x97, 0x8b, 0xd3, 0xcf, 0x3f, 0x24, 0xde, - 0xcd, 0x3f, 0xd4, 0xcf, 0x45, 0x80, 0x49, 0x88, 0x87, 0x8b, 0xc3, 0xd3, 0x2f, 0xc4, 0x35, 0xc8, - 0xcf, 0xd1, 0x47, 0x80, 0xd9, 0x49, 0xe2, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, - 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, - 0x92, 0xd8, 0xc0, 0xce, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x7b, 0x1d, 0xd4, 0xa7, 0xe2, - 0x00, 0x00, 0x00, -} - -func (m *EDSRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *EDSRequest) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *EDSRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Hash) > 0 { - i -= len(m.Hash) - copy(dAtA[i:], m.Hash) - i = encodeVarintExtendedDataSquare(dAtA, i, uint64(len(m.Hash))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil + proto.RegisterFile("share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.proto", fileDescriptor_5176f06f10cac3fd) +} + +var fileDescriptor_5176f06f10cac3fd = []byte{ + // 244 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xb2, 0x29, 0xce, 0x48, 0x2c, + 0x4a, 0xd5, 0x2f, 0xce, 0x28, 0x4f, 0x2c, 0xd0, 0x2f, 0x30, 0x2a, 0xd0, 0x2f, 0xce, 0x28, 0x4a, + 0xad, 0x80, 0x90, 0xa9, 0x29, 0xc5, 0xfa, 0x05, 0x49, 0xfa, 0xa9, 0x15, 0x25, 0xa9, 0x79, 0x29, + 0xa9, 0x29, 0xf1, 0x29, 0x89, 0x25, 0x89, 0xf1, 0xc5, 0x85, 0xa5, 0x89, 0x45, 0xa9, 0x7a, 0x05, + 0x45, 0xf9, 0x25, 0xf9, 0x4a, 0x7a, 0x5c, 0xdc, 0xae, 0x2e, 0xc1, 0x41, 0xa9, 0xc5, 0x05, 0xf9, + 0x79, 0xc5, 0xa9, 0x42, 0xf2, 0x5c, 0x6c, 0xc5, 0x25, 0x89, 0x25, 0xa5, 0xc5, 0x12, 0x8c, 0x0a, + 0x8c, 0x1a, 0x7c, 0x46, 0xec, 0x7a, 0xc1, 0x60, 0x6e, 0x10, 0x54, 0x58, 0xcb, 0x8a, 0x8b, 0x0d, + 0x22, 0x22, 0xc4, 0xcd, 0xc5, 0xee, 0xe9, 0x17, 0xe6, 0xe8, 0xe3, 0xe9, 0x22, 0xc0, 0x20, 0xc4, + 0xc6, 0xc5, 0xe4, 0xef, 0x2d, 0xc0, 0x28, 0xc4, 0xcb, 0xc5, 0xe9, 0xe7, 0x1f, 0x12, 0xef, 0xe6, + 0x1f, 0xea, 0xe7, 0x22, 0xc0, 0x24, 0xc4, 0xc3, 0xc5, 0xe1, 0xe9, 0x17, 0xe2, 0x1a, 0xe4, 0xe7, + 0xe8, 0x23, 0xc0, 0xec, 0x94, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, + 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x51, + 0x6e, 0xe9, 0x99, 0x25, 0x19, 0xa5, 0x49, 0x7a, 0xc9, 0xf9, 0xb9, 0xfa, 0xc9, 0xa9, 0x39, 0xa9, + 0xc5, 0x25, 0x99, 0x89, 0xf9, 0x45, 0xe9, 0x70, 0xb6, 0x6e, 0x5e, 0x7e, 0x0a, 0xc8, 0x8b, 0x04, + 0x3c, 0x9a, 0xc4, 0x06, 0xf6, 0x94, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x91, 0x62, 0x7c, 0xa1, + 0x14, 0x01, 0x00, 0x00, } func (m *EDSResponse) Marshal() (dAtA []byte, err error) { @@ -239,19 +165,6 @@ func encodeVarintExtendedDataSquare(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *EDSRequest) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Hash) - if l > 0 { - n += 1 + l + sovExtendedDataSquare(uint64(l)) - } - return n -} - func (m *EDSResponse) Size() (n int) { if m == nil { return 0 @@ -270,90 +183,6 @@ func sovExtendedDataSquare(x uint64) (n int) { func sozExtendedDataSquare(x uint64) (n int) { return sovExtendedDataSquare(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *EDSRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowExtendedDataSquare - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: EDSRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: EDSRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowExtendedDataSquare - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthExtendedDataSquare - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthExtendedDataSquare - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Hash = append(m.Hash[:0], dAtA[iNdEx:postIndex]...) - if m.Hash == nil { - m.Hash = []byte{} - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipExtendedDataSquare(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthExtendedDataSquare - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func (m *EDSResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/share/p2p/shrexeds/pb/extended_data_square.proto b/share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.proto similarity index 67% rename from share/p2p/shrexeds/pb/extended_data_square.proto rename to share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.proto index 63750962e9..d8d2cf6a52 100644 --- a/share/p2p/shrexeds/pb/extended_data_square.proto +++ b/share/shwap/p2p/shrex/shrexeds/pb/extended_data_square.proto @@ -1,8 +1,6 @@ syntax = "proto3"; -message EDSRequest { - bytes hash = 1; // identifies the requested EDS. -} +option go_package = "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds/pb"; enum Status { INVALID = 0; diff --git a/share/p2p/shrexeds/server.go b/share/shwap/p2p/shrex/shrexeds/server.go similarity index 64% rename from share/p2p/shrexeds/server.go rename to share/shwap/p2p/shrex/shrexeds/server.go index 15d67d2111..dbcfcadc4a 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/shwap/p2p/shrex/shrexeds/server.go @@ -14,10 +14,12 @@ import ( "github.com/celestiaorg/go-libp2p-messenger/serde" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/p2p" - p2p_pb "github.com/celestiaorg/celestia-node/share/p2p/shrexeds/pb" + "github.com/celestiaorg/celestia-node/libs/utils" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" + p2p_pb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds/pb" + "github.com/celestiaorg/celestia-node/store" ) // Server is responsible for serving ODSs for blocksync over the ShrEx/EDS protocol. @@ -28,15 +30,15 @@ type Server struct { host host.Host protocolID protocol.ID - store *eds.Store + store *store.Store params *Parameters - middleware *p2p.Middleware - metrics *p2p.Metrics + middleware *shrex.Middleware + metrics *shrex.Metrics } // NewServer creates a new ShrEx/EDS server. -func NewServer(params *Parameters, host host.Host, store *eds.Store) (*Server, error) { +func NewServer(params *Parameters, host host.Host, store *store.Store) (*Server, error) { if err := params.Validate(); err != nil { return nil, fmt.Errorf("shrex-eds: server creation failed: %w", err) } @@ -44,18 +46,15 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store) (*Server, e return &Server{ host: host, store: store, - protocolID: p2p.ProtocolID(params.NetworkID(), protocolString), + protocolID: shrex.ProtocolID(params.NetworkID(), protocolString), params: params, - middleware: p2p.NewMiddleware(params.ConcurrencyLimit), + middleware: shrex.NewMiddleware(params.ConcurrencyLimit), }, nil } func (s *Server) Start(context.Context) error { s.ctx, s.cancel = context.WithCancel(context.Background()) - handler := s.handleStream - withRateLimit := s.middleware.RateLimitHandler(handler) - withRecovery := p2p.RecoveryMiddleware(withRateLimit) - s.host.SetStreamHandler(s.protocolID, withRecovery) + s.host.SetStreamHandler(s.protocolID, s.middleware.RateLimitHandler(s.handleStream)) return nil } @@ -68,7 +67,7 @@ func (s *Server) Stop(context.Context) error { func (s *Server) observeRateLimitedRequests() { numRateLimited := s.middleware.DrainCounter() if numRateLimited > 0 { - s.metrics.ObserveRequests(context.Background(), numRateLimited, p2p.StatusRateLimited) + s.metrics.ObserveRequests(context.Background(), numRateLimited, shrex.StatusRateLimited) } } @@ -79,22 +78,14 @@ func (s *Server) handleStream(stream network.Stream) { s.observeRateLimitedRequests() // read request from stream to get the dataHash for store lookup - req, err := s.readRequest(logger, stream) + id, err := s.readRequest(logger, stream) if err != nil { logger.Warnw("server: reading request from stream", "err", err) stream.Reset() //nolint:errcheck return } - // ensure the requested dataHash is a valid root - hash := share.DataHash(req.Hash) - err = hash.Validate() - if err != nil { - logger.Warnw("server: invalid request", "err", err) - stream.Reset() //nolint:errcheck - return - } - logger = logger.With("hash", hash.String()) + logger = logger.With("height", id.Height) ctx, cancel := context.WithTimeout(s.ctx, s.params.HandleRequestTimeout) defer cancel() @@ -102,22 +93,18 @@ func (s *Server) handleStream(stream network.Stream) { // determine whether the EDS is available in our store // we do not close the reader, so that other requests will not need to re-open the file. // closing is handled by the LRU cache. - edsReader, err := s.store.GetCAR(ctx, hash) + file, err := s.store.GetByHeight(ctx, id.Height) var status p2p_pb.Status switch { case err == nil: - defer func() { - if err := edsReader.Close(); err != nil { - log.Warnw("closing car reader", "err", err) - } - }() + defer utils.CloseAndLog(logger, "file", file) status = p2p_pb.Status_OK - case errors.Is(err, eds.ErrNotFound): - logger.Warnw("server: request hash not found") - s.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) + case errors.Is(err, store.ErrNotFound): + logger.Warnw("server: request height not found") + s.metrics.ObserveRequests(ctx, 1, shrex.StatusNotFound) status = p2p_pb.Status_NOT_FOUND case err != nil: - logger.Errorw("server: get CAR", "err", err) + logger.Errorw("server: get file", "err", err) status = p2p_pb.Status_INTERNAL } @@ -138,37 +125,41 @@ func (s *Server) handleStream(stream network.Stream) { } // start streaming the ODS to the client - err = s.writeODS(logger, edsReader, stream) + err = s.writeODS(logger, file, stream) if err != nil { logger.Warnw("server: writing ods to stream", "err", err) stream.Reset() //nolint:errcheck return } - s.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) + s.metrics.ObserveRequests(ctx, 1, shrex.StatusSuccess) err = stream.Close() if err != nil { logger.Debugw("server: closing stream", "err", err) } } -func (s *Server) readRequest(logger *zap.SugaredLogger, stream network.Stream) (*p2p_pb.EDSRequest, error) { +func (s *Server) readRequest(logger *zap.SugaredLogger, stream network.Stream) (shwap.EdsID, error) { err := stream.SetReadDeadline(time.Now().Add(s.params.ServerReadTimeout)) if err != nil { logger.Debugw("server: set read deadline", "err", err) } - req := new(p2p_pb.EDSRequest) - _, err = serde.Read(stream, req) + req := make([]byte, shwap.EdsIDSize) + _, err = io.ReadFull(stream, req) + if err != nil { + return shwap.EdsID{}, fmt.Errorf("reading request: %w", err) + } + id, err := shwap.EdsIDFromBinary(req) if err != nil { - return nil, err + return shwap.EdsID{}, fmt.Errorf("parsing request: %w", err) } err = stream.CloseRead() if err != nil { logger.Debugw("server: closing read", "err", err) } - return req, nil + return id, id.Validate() } func (s *Server) writeStatus(logger *zap.SugaredLogger, status p2p_pb.Status, stream network.Stream) error { @@ -182,21 +173,22 @@ func (s *Server) writeStatus(logger *zap.SugaredLogger, status p2p_pb.Status, st return err } -func (s *Server) writeODS(logger *zap.SugaredLogger, edsReader io.Reader, stream network.Stream) error { - err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout)) +func (s *Server) writeODS(logger *zap.SugaredLogger, stramer eds.Streamer, stream network.Stream) error { + reader, err := stramer.Reader() if err != nil { - logger.Debugw("server: set read deadline", "err", err) + return fmt.Errorf("getting ODS reader: %w", err) } - - odsReader, err := eds.ODSReader(edsReader) + err = stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout)) if err != nil { - return fmt.Errorf("creating ODS reader: %w", err) + logger.Debugw("server: set read deadline", "err", err) } + buf := make([]byte, s.params.BufferSize) - _, err = io.CopyBuffer(stream, odsReader, buf) + n, err := io.CopyBuffer(stream, reader, buf) if err != nil { - return fmt.Errorf("writing ODS bytes: %w", err) + return fmt.Errorf("written: %v, writing ODS bytes: %w", n, err) } + logger.Debugw("server: wrote ODS", "bytes", n) return nil } diff --git a/share/p2p/shrexnd/client.go b/share/shwap/p2p/shrex/shrexnd/client.go similarity index 73% rename from share/p2p/shrexnd/client.go rename to share/shwap/p2p/shrex/shrexnd/client.go index ad4d3a37a7..b8bf6db8e6 100644 --- a/share/p2p/shrexnd/client.go +++ b/share/shwap/p2p/shrex/shrexnd/client.go @@ -17,8 +17,9 @@ import ( "github.com/celestiaorg/nmt" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/p2p" - pb "github.com/celestiaorg/celestia-node/share/p2p/shrexnd/pb" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" + pb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexnd/pb" ) // Client implements client side of shrex/nd protocol to obtain namespaced shares data from remote @@ -28,7 +29,7 @@ type Client struct { protocolID protocol.ID host host.Host - metrics *p2p.Metrics + metrics *shrex.Metrics } // NewClient creates a new shrEx/nd client @@ -39,29 +40,29 @@ func NewClient(params *Parameters, host host.Host) (*Client, error) { return &Client{ host: host, - protocolID: p2p.ProtocolID(params.NetworkID(), protocolString), + protocolID: shrex.ProtocolID(params.NetworkID(), protocolString), params: params, }, nil } // RequestND requests namespaced data from the given peer. -// Returns NamespacedShares with unverified inclusion proofs against the share.AxisRoots. +// Returns NamespacedData with unverified inclusion proofs against the share.Root. func (c *Client) RequestND( ctx context.Context, - root *share.AxisRoots, + height uint64, namespace share.Namespace, peer peer.ID, -) (share.NamespacedShares, error) { +) (shwap.NamespacedData, error) { if err := namespace.ValidateForData(); err != nil { return nil, err } - shares, err := c.doRequest(ctx, root, namespace, peer) + shares, err := c.doRequest(ctx, height, namespace, peer) if err == nil { return shares, nil } if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusTimeout) return nil, err } // some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not @@ -69,11 +70,11 @@ func (c *Client) RequestND( var ne net.Error if errors.As(err, &ne) && ne.Timeout() { if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusTimeout) return nil, context.DeadlineExceeded } } - if !errors.Is(err, p2p.ErrNotFound) && !errors.Is(err, p2p.ErrRateLimited) { + if !errors.Is(err, shrex.ErrNotFound) && errors.Is(err, shrex.ErrRateLimited) { log.Warnw("client-nd: peer returned err", "err", err) } return nil, err @@ -81,10 +82,10 @@ func (c *Client) RequestND( func (c *Client) doRequest( ctx context.Context, - root *share.AxisRoots, + height uint64, namespace share.Namespace, peerID peer.ID, -) (share.NamespacedShares, error) { +) (shwap.NamespacedData, error) { stream, err := c.host.NewStream(ctx, peerID, c.protocolID) if err != nil { return nil, err @@ -93,14 +94,19 @@ func (c *Client) doRequest( c.setStreamDeadlines(ctx, stream) - req := &pb.GetSharesByNamespaceRequest{ - RootHash: root.Hash(), - Namespace: namespace, + req, err := shwap.NewNamespaceDataID(height, namespace) + if err != nil { + return nil, fmt.Errorf("client-nd: creating request: %w", err) + } + + br, err := req.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("client-nd: marshaling request: %w", err) } - _, err = serde.Write(stream, req) + _, err = stream.Write(br) if err != nil { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusSendReqErr) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusSendReqErr) stream.Reset() //nolint:errcheck return nil, fmt.Errorf("client-nd: writing request: %w", err) } @@ -122,10 +128,10 @@ func (c *Client) readStatus(ctx context.Context, stream network.Stream) error { if err != nil { // server is overloaded and closed the stream if errors.Is(err, io.EOF) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited) - return p2p.ErrRateLimited + c.metrics.ObserveRequests(ctx, 1, shrex.StatusRateLimited) + return shrex.ErrRateLimited } - c.metrics.ObserveRequests(ctx, 1, p2p.StatusReadRespErr) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusReadRespErr) stream.Reset() //nolint:errcheck return fmt.Errorf("client-nd: reading status response: %w", err) } @@ -133,12 +139,12 @@ func (c *Client) readStatus(ctx context.Context, stream network.Stream) error { return c.convertStatusToErr(ctx, resp.Status) } -// readNamespacedShares converts proto Rows to share.NamespacedShares +// readNamespacedShares converts proto Rows to share.NamespacedData func (c *Client) readNamespacedShares( ctx context.Context, stream network.Stream, -) (share.NamespacedShares, error) { - var shares share.NamespacedShares +) (shwap.NamespacedData, error) { + var shares shwap.NamespacedData for { var row pb.NamespaceRowResponse _, err := serde.Read(stream, &row) @@ -147,7 +153,7 @@ func (c *Client) readNamespacedShares( // all data is received and steam is closed by server return shares, nil } - c.metrics.ObserveRequests(ctx, 1, p2p.StatusReadRespErr) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusReadRespErr) return nil, err } var proof nmt.Proof @@ -169,7 +175,7 @@ func (c *Client) readNamespacedShares( ) } } - shares = append(shares, share.NamespacedRow{ + shares = append(shares, shwap.RowNamespaceData{ Shares: row.Shares, Proof: &proof, }) @@ -207,17 +213,17 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) func (c *Client) convertStatusToErr(ctx context.Context, status pb.StatusCode) error { switch status { case pb.StatusCode_OK: - c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) + c.metrics.ObserveRequests(ctx, 1, shrex.StatusSuccess) return nil case pb.StatusCode_NOT_FOUND: - c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) - return p2p.ErrNotFound + c.metrics.ObserveRequests(ctx, 1, shrex.StatusNotFound) + return shrex.ErrNotFound case pb.StatusCode_INVALID: log.Warn("client-nd: invalid request") fallthrough case pb.StatusCode_INTERNAL: fallthrough default: - return p2p.ErrInvalidResponse + return shrex.ErrInvalidResponse } } diff --git a/share/p2p/shrexnd/doc.go b/share/shwap/p2p/shrex/shrexnd/doc.go similarity index 100% rename from share/p2p/shrexnd/doc.go rename to share/shwap/p2p/shrex/shrexnd/doc.go diff --git a/share/p2p/shrexnd/exchange_test.go b/share/shwap/p2p/shrex/shrexnd/exchange_test.go similarity index 69% rename from share/p2p/shrexnd/exchange_test.go rename to share/shwap/p2p/shrex/shrexnd/exchange_test.go index e86f72e2fe..7de8f7c6ec 100644 --- a/share/p2p/shrexnd/exchange_test.go +++ b/share/shwap/p2p/shrex/shrexnd/exchange_test.go @@ -6,35 +6,35 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" libhost "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" - "github.com/celestiaorg/celestia-node/share/p2p" "github.com/celestiaorg/celestia-node/share/sharetest" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" + "github.com/celestiaorg/celestia-node/store" ) func TestExchange_RequestND_NotFound(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) edsStore, client, server := makeExchange(t) - require.NoError(t, edsStore.Start(ctx)) require.NoError(t, server.Start(ctx)) + height := atomic.NewUint64(1) + t.Run("CAR_not_exist", func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second) t.Cleanup(cancel) - root := share.AxisRoots{} namespace := sharetest.RandV0Namespace() - _, err := client.RequestND(ctx, &root, namespace, server.host.ID()) - require.ErrorIs(t, err, p2p.ErrNotFound) + height := height.Add(1) + _, err := client.RequestND(ctx, height, namespace, server.host.ID()) + require.ErrorIs(t, err, shrex.ErrNotFound) }) t.Run("ErrNamespaceNotFound", func(t *testing.T) { @@ -44,10 +44,13 @@ func TestExchange_RequestND_NotFound(t *testing.T) { eds := edstest.RandEDS(t, 4) roots, err := share.NewAxisRoots(eds) require.NoError(t, err) - require.NoError(t, edsStore.Put(ctx, roots.Hash(), eds)) + + height := height.Add(1) + err = edsStore.Put(ctx, roots, height, eds) + require.NoError(t, err) namespace := sharetest.RandV0Namespace() - emptyShares, err := client.RequestND(ctx, roots, namespace, server.host.ID()) + emptyShares, err := client.RequestND(ctx, height, namespace, server.host.ID()) require.NoError(t, err) require.Empty(t, emptyShares.Flatten()) }) @@ -83,34 +86,24 @@ func TestExchange_RequestND(t *testing.T) { t.Fatal("timeout") } } - middleware := p2p.NewMiddleware(rateLimit) + middleware := shrex.NewMiddleware(rateLimit) server.host.SetStreamHandler(server.protocolID, middleware.RateLimitHandler(mockHandler)) // take server concurrency slots with blocked requests for i := 0; i < rateLimit; i++ { go func(i int) { - client.RequestND(ctx, nil, sharetest.RandV0Namespace(), server.host.ID()) //nolint:errcheck + client.RequestND(ctx, 1, sharetest.RandV0Namespace(), server.host.ID()) //nolint:errcheck }(i) } // wait until all server slots are taken wg.Wait() - _, err = client.RequestND(ctx, nil, sharetest.RandV0Namespace(), server.host.ID()) - require.ErrorIs(t, err, p2p.ErrRateLimited) + _, err = client.RequestND(ctx, 1, sharetest.RandV0Namespace(), server.host.ID()) + require.ErrorIs(t, err, shrex.ErrRateLimited) }) } -func newStore(t *testing.T) *eds.Store { - t.Helper() - - storeCfg := eds.DefaultParameters() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - store, err := eds.NewStore(storeCfg, t.TempDir(), ds) - require.NoError(t, err) - return store -} - func createMocknet(t *testing.T, amount int) []libhost.Host { t.Helper() @@ -120,15 +113,16 @@ func createMocknet(t *testing.T, amount int) []libhost.Host { return net.Hosts() } -func makeExchange(t *testing.T) (*eds.Store, *Client, *Server) { +func makeExchange(t *testing.T) (*store.Store, *Client, *Server) { t.Helper() - store := newStore(t) + s, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) hosts := createMocknet(t, 2) client, err := NewClient(DefaultParameters(), hosts[0]) require.NoError(t, err) - server, err := NewServer(DefaultParameters(), hosts[1], store) + server, err := NewServer(DefaultParameters(), hosts[1], s) require.NoError(t, err) - return store, client, server + return s, client, server } diff --git a/share/p2p/shrexnd/params.go b/share/shwap/p2p/shrex/shrexnd/params.go similarity index 67% rename from share/p2p/shrexnd/params.go rename to share/shwap/p2p/shrex/shrexnd/params.go index 8489627a07..2e1acd6010 100644 --- a/share/p2p/shrexnd/params.go +++ b/share/shwap/p2p/shrex/shrexnd/params.go @@ -5,22 +5,22 @@ import ( logging "github.com/ipfs/go-log/v2" - "github.com/celestiaorg/celestia-node/share/p2p" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" ) -const protocolString = "/shrex/nd/v0.0.3" +const protocolString = "/shrex/nd/v0.0.4" var log = logging.Logger("shrex/nd") // Parameters is the set of parameters that must be configured for the shrex/eds protocol. -type Parameters = p2p.Parameters +type Parameters = shrex.Parameters func DefaultParameters() *Parameters { - return p2p.DefaultParameters() + return shrex.DefaultParameters() } func (c *Client) WithMetrics() error { - metrics, err := p2p.InitClientMetrics("nd") + metrics, err := shrex.InitClientMetrics("nd") if err != nil { return fmt.Errorf("shrex/nd: init Metrics: %w", err) } @@ -29,7 +29,7 @@ func (c *Client) WithMetrics() error { } func (srv *Server) WithMetrics() error { - metrics, err := p2p.InitServerMetrics("nd") + metrics, err := shrex.InitServerMetrics("nd") if err != nil { return fmt.Errorf("shrex/nd: init Metrics: %w", err) } diff --git a/share/p2p/shrexnd/pb/share.pb.go b/share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.pb.go similarity index 55% rename from share/p2p/shrexnd/pb/share.pb.go rename to share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.pb.go index 7e3c11416f..071046a85a 100644 --- a/share/p2p/shrexnd/pb/share.pb.go +++ b/share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: share/p2p/shrexnd/pb/share.proto +// source: share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.proto package share_p2p_shrex_nd @@ -51,59 +51,7 @@ func (x StatusCode) String() string { } func (StatusCode) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_ed9f13149b0de397, []int{0} -} - -type GetSharesByNamespaceRequest struct { - RootHash []byte `protobuf:"bytes,1,opt,name=root_hash,json=rootHash,proto3" json:"root_hash,omitempty"` - Namespace []byte `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` -} - -func (m *GetSharesByNamespaceRequest) Reset() { *m = GetSharesByNamespaceRequest{} } -func (m *GetSharesByNamespaceRequest) String() string { return proto.CompactTextString(m) } -func (*GetSharesByNamespaceRequest) ProtoMessage() {} -func (*GetSharesByNamespaceRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_ed9f13149b0de397, []int{0} -} -func (m *GetSharesByNamespaceRequest) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *GetSharesByNamespaceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_GetSharesByNamespaceRequest.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *GetSharesByNamespaceRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_GetSharesByNamespaceRequest.Merge(m, src) -} -func (m *GetSharesByNamespaceRequest) XXX_Size() int { - return m.Size() -} -func (m *GetSharesByNamespaceRequest) XXX_DiscardUnknown() { - xxx_messageInfo_GetSharesByNamespaceRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_GetSharesByNamespaceRequest proto.InternalMessageInfo - -func (m *GetSharesByNamespaceRequest) GetRootHash() []byte { - if m != nil { - return m.RootHash - } - return nil -} - -func (m *GetSharesByNamespaceRequest) GetNamespace() []byte { - if m != nil { - return m.Namespace - } - return nil + return fileDescriptor_e8097b1aa3ae2e25, []int{0} } type GetSharesByNamespaceStatusResponse struct { @@ -114,7 +62,7 @@ func (m *GetSharesByNamespaceStatusResponse) Reset() { *m = GetSharesByN func (m *GetSharesByNamespaceStatusResponse) String() string { return proto.CompactTextString(m) } func (*GetSharesByNamespaceStatusResponse) ProtoMessage() {} func (*GetSharesByNamespaceStatusResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_ed9f13149b0de397, []int{1} + return fileDescriptor_e8097b1aa3ae2e25, []int{0} } func (m *GetSharesByNamespaceStatusResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -159,7 +107,7 @@ func (m *NamespaceRowResponse) Reset() { *m = NamespaceRowResponse{} } func (m *NamespaceRowResponse) String() string { return proto.CompactTextString(m) } func (*NamespaceRowResponse) ProtoMessage() {} func (*NamespaceRowResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_ed9f13149b0de397, []int{2} + return fileDescriptor_e8097b1aa3ae2e25, []int{1} } func (m *NamespaceRowResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -204,73 +152,35 @@ func (m *NamespaceRowResponse) GetProof() *pb.Proof { func init() { proto.RegisterEnum("share.p2p.shrex.nd.StatusCode", StatusCode_name, StatusCode_value) - proto.RegisterType((*GetSharesByNamespaceRequest)(nil), "share.p2p.shrex.nd.GetSharesByNamespaceRequest") proto.RegisterType((*GetSharesByNamespaceStatusResponse)(nil), "share.p2p.shrex.nd.GetSharesByNamespaceStatusResponse") proto.RegisterType((*NamespaceRowResponse)(nil), "share.p2p.shrex.nd.NamespaceRowResponse") } -func init() { proto.RegisterFile("share/p2p/shrexnd/pb/share.proto", fileDescriptor_ed9f13149b0de397) } - -var fileDescriptor_ed9f13149b0de397 = []byte{ - // 326 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0x4f, 0x4b, 0xf3, 0x40, - 0x10, 0xc6, 0x93, 0x96, 0x37, 0x6f, 0x3b, 0xad, 0x35, 0x2c, 0x22, 0xc5, 0xca, 0x52, 0x02, 0x42, - 0xf1, 0xb0, 0x81, 0x08, 0x1e, 0x85, 0xd6, 0xfa, 0xa7, 0x58, 0x52, 0xd9, 0xb6, 0xe2, 0x41, 0x28, - 0x1b, 0xbb, 0x92, 0x8b, 0xd9, 0x35, 0xbb, 0x45, 0xfd, 0x16, 0x7e, 0x2c, 0x8f, 0x3d, 0x7a, 0x94, - 0xf6, 0x8b, 0x48, 0xb6, 0xd1, 0x1c, 0xf4, 0xb6, 0xf3, 0xcc, 0x33, 0xbf, 0x7d, 0x66, 0xa0, 0xad, - 0x62, 0x96, 0x72, 0x5f, 0x06, 0xd2, 0x57, 0x71, 0xca, 0x5f, 0x92, 0xb9, 0x2f, 0x23, 0xdf, 0x88, - 0x44, 0xa6, 0x42, 0x0b, 0x84, 0xf2, 0x22, 0x90, 0xc4, 0x38, 0x48, 0x32, 0xdf, 0x6b, 0xc8, 0xc8, - 0x97, 0xa9, 0x10, 0x0f, 0x1b, 0x8f, 0x77, 0x0b, 0xad, 0x0b, 0xae, 0xc7, 0x99, 0x51, 0xf5, 0x5e, - 0x43, 0xf6, 0xc8, 0x95, 0x64, 0xf7, 0x9c, 0xf2, 0xa7, 0x05, 0x57, 0x1a, 0xb5, 0xa0, 0x9a, 0x0a, - 0xa1, 0x67, 0x31, 0x53, 0x71, 0xd3, 0x6e, 0xdb, 0x9d, 0x3a, 0xad, 0x64, 0xc2, 0x25, 0x53, 0x31, - 0xda, 0x87, 0x6a, 0xf2, 0x3d, 0xd0, 0x2c, 0x99, 0x66, 0x21, 0x78, 0x77, 0xe0, 0xfd, 0x45, 0x1e, - 0x6b, 0xa6, 0x17, 0x8a, 0x72, 0x25, 0x45, 0xa2, 0x38, 0x3a, 0x06, 0x47, 0x19, 0xc5, 0xd0, 0x1b, - 0x01, 0x26, 0xbf, 0x43, 0x93, 0xcd, 0xcc, 0xa9, 0x98, 0x73, 0x9a, 0xbb, 0xbd, 0x29, 0xec, 0x14, - 0x61, 0xc5, 0xf3, 0x0f, 0x6f, 0x17, 0x1c, 0x03, 0xc8, 0x78, 0xe5, 0x4e, 0x9d, 0xe6, 0x15, 0x3a, - 0x80, 0x7f, 0x66, 0x6d, 0x93, 0xb3, 0x16, 0x6c, 0x93, 0xfc, 0x08, 0x11, 0xb9, 0xce, 0x1e, 0x74, - 0xd3, 0x3d, 0x3c, 0x01, 0x28, 0x3e, 0x43, 0x35, 0xf8, 0x3f, 0x08, 0x6f, 0xba, 0xc3, 0x41, 0xdf, - 0xb5, 0x90, 0x03, 0xa5, 0xd1, 0x95, 0x6b, 0xa3, 0x2d, 0xa8, 0x86, 0xa3, 0xc9, 0xec, 0x7c, 0x34, - 0x0d, 0xfb, 0x6e, 0x09, 0xd5, 0xa1, 0x32, 0x08, 0x27, 0x67, 0x34, 0xec, 0x0e, 0xdd, 0x72, 0xaf, - 0xf9, 0xbe, 0xc2, 0xf6, 0x72, 0x85, 0xed, 0xcf, 0x15, 0xb6, 0xdf, 0xd6, 0xd8, 0x5a, 0xae, 0xb1, - 0xf5, 0xb1, 0xc6, 0x56, 0xe4, 0x98, 0x7b, 0x1f, 0x7d, 0x05, 0x00, 0x00, 0xff, 0xff, 0x1a, 0x53, - 0xb4, 0x86, 0xb7, 0x01, 0x00, 0x00, -} - -func (m *GetSharesByNamespaceRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *GetSharesByNamespaceRequest) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *GetSharesByNamespaceRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Namespace) > 0 { - i -= len(m.Namespace) - copy(dAtA[i:], m.Namespace) - i = encodeVarintShare(dAtA, i, uint64(len(m.Namespace))) - i-- - dAtA[i] = 0x12 - } - if len(m.RootHash) > 0 { - i -= len(m.RootHash) - copy(dAtA[i:], m.RootHash) - i = encodeVarintShare(dAtA, i, uint64(len(m.RootHash))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil +func init() { + proto.RegisterFile("share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.proto", fileDescriptor_e8097b1aa3ae2e25) +} + +var fileDescriptor_e8097b1aa3ae2e25 = []byte{ + // 301 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xb2, 0x2c, 0xce, 0x48, 0x2c, + 0x4a, 0xd5, 0x2f, 0xce, 0x28, 0x4f, 0x2c, 0xd0, 0x2f, 0x30, 0x2a, 0xd0, 0x2f, 0xce, 0x28, 0x4a, + 0xad, 0x80, 0x90, 0x79, 0x29, 0xfa, 0x05, 0x49, 0xfa, 0x45, 0xf9, 0xe5, 0xf1, 0x79, 0x89, 0xb9, + 0xa9, 0xc5, 0x05, 0x89, 0xc9, 0xa9, 0xf1, 0x29, 0x89, 0x25, 0x89, 0x7a, 0x05, 0x45, 0xf9, 0x25, + 0xf9, 0x42, 0x42, 0x60, 0xad, 0x7a, 0x05, 0x46, 0x05, 0x7a, 0x60, 0xe5, 0x7a, 0x79, 0x29, 0x52, + 0x7c, 0x05, 0x49, 0xfa, 0x05, 0x45, 0xf9, 0xf9, 0x69, 0x10, 0x35, 0x4a, 0x31, 0x5c, 0x4a, 0xee, + 0xa9, 0x25, 0xc1, 0x20, 0x85, 0xc5, 0x4e, 0x95, 0x7e, 0x30, 0x63, 0x82, 0x4b, 0x12, 0x4b, 0x4a, + 0x8b, 0x83, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0xcc, 0xb8, 0xd8, 0x8a, 0xc1, 0x22, + 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x7c, 0x46, 0x72, 0x7a, 0x98, 0x46, 0xeb, 0x41, 0xf4, 0x38, 0xe7, + 0xa7, 0xa4, 0x06, 0x41, 0x55, 0x2b, 0x85, 0x72, 0x89, 0xc0, 0x8d, 0x0c, 0xca, 0x2f, 0x87, 0x9b, + 0x27, 0xc6, 0xc5, 0x06, 0x36, 0x00, 0x64, 0x1e, 0xb3, 0x06, 0x4f, 0x10, 0x94, 0x27, 0xa4, 0xca, + 0xc5, 0x0a, 0x76, 0x9c, 0x04, 0x93, 0x02, 0xa3, 0x06, 0xb7, 0x11, 0xbf, 0x1e, 0xd4, 0xa9, 0x49, + 0x7a, 0x01, 0x20, 0x46, 0x10, 0x44, 0x56, 0xcb, 0x8e, 0x8b, 0x0b, 0x61, 0x99, 0x10, 0x37, 0x17, + 0xbb, 0xa7, 0x5f, 0x98, 0xa3, 0x8f, 0xa7, 0x8b, 0x00, 0x83, 0x10, 0x1b, 0x17, 0x93, 0xbf, 0xb7, + 0x00, 0xa3, 0x10, 0x2f, 0x17, 0xa7, 0x9f, 0x7f, 0x48, 0xbc, 0x9b, 0x7f, 0xa8, 0x9f, 0x8b, 0x00, + 0x93, 0x10, 0x0f, 0x17, 0x87, 0xa7, 0x5f, 0x88, 0x6b, 0x90, 0x9f, 0xa3, 0x8f, 0x00, 0xb3, 0x93, + 0xc4, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, + 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x43, 0xc5, 0x18, + 0x10, 0x00, 0x00, 0xff, 0xff, 0x82, 0x3a, 0xfe, 0x72, 0x76, 0x01, 0x00, 0x00, } func (m *GetSharesByNamespaceStatusResponse) Marshal() (dAtA []byte, err error) { @@ -294,7 +204,7 @@ func (m *GetSharesByNamespaceStatusResponse) MarshalToSizedBuffer(dAtA []byte) ( var l int _ = l if m.Status != 0 { - i = encodeVarintShare(dAtA, i, uint64(m.Status)) + i = encodeVarintRowNamespaceData(dAtA, i, uint64(m.Status)) i-- dAtA[i] = 0x8 } @@ -328,7 +238,7 @@ func (m *NamespaceRowResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return 0, err } i -= size - i = encodeVarintShare(dAtA, i, uint64(size)) + i = encodeVarintRowNamespaceData(dAtA, i, uint64(size)) } i-- dAtA[i] = 0x12 @@ -337,7 +247,7 @@ func (m *NamespaceRowResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { for iNdEx := len(m.Shares) - 1; iNdEx >= 0; iNdEx-- { i -= len(m.Shares[iNdEx]) copy(dAtA[i:], m.Shares[iNdEx]) - i = encodeVarintShare(dAtA, i, uint64(len(m.Shares[iNdEx]))) + i = encodeVarintRowNamespaceData(dAtA, i, uint64(len(m.Shares[iNdEx]))) i-- dAtA[i] = 0xa } @@ -345,8 +255,8 @@ func (m *NamespaceRowResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func encodeVarintShare(dAtA []byte, offset int, v uint64) int { - offset -= sovShare(v) +func encodeVarintRowNamespaceData(dAtA []byte, offset int, v uint64) int { + offset -= sovRowNamespaceData(v) base := offset for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -356,23 +266,6 @@ func encodeVarintShare(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *GetSharesByNamespaceRequest) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.RootHash) - if l > 0 { - n += 1 + l + sovShare(uint64(l)) - } - l = len(m.Namespace) - if l > 0 { - n += 1 + l + sovShare(uint64(l)) - } - return n -} - func (m *GetSharesByNamespaceStatusResponse) Size() (n int) { if m == nil { return 0 @@ -380,7 +273,7 @@ func (m *GetSharesByNamespaceStatusResponse) Size() (n int) { var l int _ = l if m.Status != 0 { - n += 1 + sovShare(uint64(m.Status)) + n += 1 + sovRowNamespaceData(uint64(m.Status)) } return n } @@ -394,139 +287,21 @@ func (m *NamespaceRowResponse) Size() (n int) { if len(m.Shares) > 0 { for _, b := range m.Shares { l = len(b) - n += 1 + l + sovShare(uint64(l)) + n += 1 + l + sovRowNamespaceData(uint64(l)) } } if m.Proof != nil { l = m.Proof.Size() - n += 1 + l + sovShare(uint64(l)) + n += 1 + l + sovRowNamespaceData(uint64(l)) } return n } -func sovShare(x uint64) (n int) { +func sovRowNamespaceData(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } -func sozShare(x uint64) (n int) { - return sovShare(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (m *GetSharesByNamespaceRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowShare - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: GetSharesByNamespaceRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: GetSharesByNamespaceRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RootHash", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowShare - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthShare - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthShare - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.RootHash = append(m.RootHash[:0], dAtA[iNdEx:postIndex]...) - if m.RootHash == nil { - m.RootHash = []byte{} - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowShare - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthShare - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthShare - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Namespace = append(m.Namespace[:0], dAtA[iNdEx:postIndex]...) - if m.Namespace == nil { - m.Namespace = []byte{} - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipShare(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthShare - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil +func sozRowNamespaceData(x uint64) (n int) { + return sovRowNamespaceData(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } func (m *GetSharesByNamespaceStatusResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) @@ -536,7 +311,7 @@ func (m *GetSharesByNamespaceStatusResponse) Unmarshal(dAtA []byte) error { var wire uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { - return ErrIntOverflowShare + return ErrIntOverflowRowNamespaceData } if iNdEx >= l { return io.ErrUnexpectedEOF @@ -564,7 +339,7 @@ func (m *GetSharesByNamespaceStatusResponse) Unmarshal(dAtA []byte) error { m.Status = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { - return ErrIntOverflowShare + return ErrIntOverflowRowNamespaceData } if iNdEx >= l { return io.ErrUnexpectedEOF @@ -578,12 +353,12 @@ func (m *GetSharesByNamespaceStatusResponse) Unmarshal(dAtA []byte) error { } default: iNdEx = preIndex - skippy, err := skipShare(dAtA[iNdEx:]) + skippy, err := skipRowNamespaceData(dAtA[iNdEx:]) if err != nil { return err } if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthShare + return ErrInvalidLengthRowNamespaceData } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF @@ -605,7 +380,7 @@ func (m *NamespaceRowResponse) Unmarshal(dAtA []byte) error { var wire uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { - return ErrIntOverflowShare + return ErrIntOverflowRowNamespaceData } if iNdEx >= l { return io.ErrUnexpectedEOF @@ -633,7 +408,7 @@ func (m *NamespaceRowResponse) Unmarshal(dAtA []byte) error { var byteLen int for shift := uint(0); ; shift += 7 { if shift >= 64 { - return ErrIntOverflowShare + return ErrIntOverflowRowNamespaceData } if iNdEx >= l { return io.ErrUnexpectedEOF @@ -646,11 +421,11 @@ func (m *NamespaceRowResponse) Unmarshal(dAtA []byte) error { } } if byteLen < 0 { - return ErrInvalidLengthShare + return ErrInvalidLengthRowNamespaceData } postIndex := iNdEx + byteLen if postIndex < 0 { - return ErrInvalidLengthShare + return ErrInvalidLengthRowNamespaceData } if postIndex > l { return io.ErrUnexpectedEOF @@ -665,7 +440,7 @@ func (m *NamespaceRowResponse) Unmarshal(dAtA []byte) error { var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { - return ErrIntOverflowShare + return ErrIntOverflowRowNamespaceData } if iNdEx >= l { return io.ErrUnexpectedEOF @@ -678,11 +453,11 @@ func (m *NamespaceRowResponse) Unmarshal(dAtA []byte) error { } } if msglen < 0 { - return ErrInvalidLengthShare + return ErrInvalidLengthRowNamespaceData } postIndex := iNdEx + msglen if postIndex < 0 { - return ErrInvalidLengthShare + return ErrInvalidLengthRowNamespaceData } if postIndex > l { return io.ErrUnexpectedEOF @@ -696,12 +471,12 @@ func (m *NamespaceRowResponse) Unmarshal(dAtA []byte) error { iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipShare(dAtA[iNdEx:]) + skippy, err := skipRowNamespaceData(dAtA[iNdEx:]) if err != nil { return err } if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthShare + return ErrInvalidLengthRowNamespaceData } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF @@ -715,7 +490,7 @@ func (m *NamespaceRowResponse) Unmarshal(dAtA []byte) error { } return nil } -func skipShare(dAtA []byte) (n int, err error) { +func skipRowNamespaceData(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 depth := 0 @@ -723,7 +498,7 @@ func skipShare(dAtA []byte) (n int, err error) { var wire uint64 for shift := uint(0); ; shift += 7 { if shift >= 64 { - return 0, ErrIntOverflowShare + return 0, ErrIntOverflowRowNamespaceData } if iNdEx >= l { return 0, io.ErrUnexpectedEOF @@ -740,7 +515,7 @@ func skipShare(dAtA []byte) (n int, err error) { case 0: for shift := uint(0); ; shift += 7 { if shift >= 64 { - return 0, ErrIntOverflowShare + return 0, ErrIntOverflowRowNamespaceData } if iNdEx >= l { return 0, io.ErrUnexpectedEOF @@ -756,7 +531,7 @@ func skipShare(dAtA []byte) (n int, err error) { var length int for shift := uint(0); ; shift += 7 { if shift >= 64 { - return 0, ErrIntOverflowShare + return 0, ErrIntOverflowRowNamespaceData } if iNdEx >= l { return 0, io.ErrUnexpectedEOF @@ -769,14 +544,14 @@ func skipShare(dAtA []byte) (n int, err error) { } } if length < 0 { - return 0, ErrInvalidLengthShare + return 0, ErrInvalidLengthRowNamespaceData } iNdEx += length case 3: depth++ case 4: if depth == 0 { - return 0, ErrUnexpectedEndOfGroupShare + return 0, ErrUnexpectedEndOfGroupRowNamespaceData } depth-- case 5: @@ -785,7 +560,7 @@ func skipShare(dAtA []byte) (n int, err error) { return 0, fmt.Errorf("proto: illegal wireType %d", wireType) } if iNdEx < 0 { - return 0, ErrInvalidLengthShare + return 0, ErrInvalidLengthRowNamespaceData } if depth == 0 { return iNdEx, nil @@ -795,7 +570,7 @@ func skipShare(dAtA []byte) (n int, err error) { } var ( - ErrInvalidLengthShare = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowShare = fmt.Errorf("proto: integer overflow") - ErrUnexpectedEndOfGroupShare = fmt.Errorf("proto: unexpected end of group") + ErrInvalidLengthRowNamespaceData = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRowNamespaceData = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupRowNamespaceData = fmt.Errorf("proto: unexpected end of group") ) diff --git a/share/p2p/shrexnd/pb/share.proto b/share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.proto similarity index 78% rename from share/p2p/shrexnd/pb/share.proto rename to share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.proto index a5bdbfa071..4f484a8c2b 100644 --- a/share/p2p/shrexnd/pb/share.proto +++ b/share/shwap/p2p/shrex/shrexnd/pb/row_namespace_data.proto @@ -3,11 +3,6 @@ syntax = "proto3"; package share.p2p.shrex.nd; import "pb/proof.proto"; -message GetSharesByNamespaceRequest{ - bytes root_hash = 1; - bytes namespace = 2; -} - message GetSharesByNamespaceStatusResponse{ StatusCode status = 1; } diff --git a/share/p2p/shrexnd/server.go b/share/shwap/p2p/shrex/shrexnd/server.go similarity index 64% rename from share/p2p/shrexnd/server.go rename to share/shwap/p2p/shrex/shrexnd/server.go index 123d581494..784d459614 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/shwap/p2p/shrex/shrexnd/server.go @@ -2,9 +2,9 @@ package shrexnd import ( "context" - "crypto/sha256" "errors" "fmt" + "io" "time" "github.com/libp2p/go-libp2p/core/host" @@ -15,10 +15,12 @@ import ( "github.com/celestiaorg/go-libp2p-messenger/serde" nmt_pb "github.com/celestiaorg/nmt/pb" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/p2p" - pb "github.com/celestiaorg/celestia-node/share/p2p/shrexnd/pb" + "github.com/celestiaorg/celestia-node/libs/utils" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" + pb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexnd/pb" + "github.com/celestiaorg/celestia-node/store" ) // Server implements server side of shrex/nd protocol to serve namespaced share to remote @@ -30,15 +32,15 @@ type Server struct { protocolID protocol.ID handler network.StreamHandler - store *eds.Store + store *store.Store params *Parameters - middleware *p2p.Middleware - metrics *p2p.Metrics + middleware *shrex.Middleware + metrics *shrex.Metrics } // NewServer creates new Server -func NewServer(params *Parameters, host host.Host, store *eds.Store) (*Server, error) { +func NewServer(params *Parameters, host host.Host, store *store.Store) (*Server, error) { if err := params.Validate(); err != nil { return nil, fmt.Errorf("shrex-nd: server creation failed: %w", err) } @@ -47,17 +49,14 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store) (*Server, e store: store, host: host, params: params, - protocolID: p2p.ProtocolID(params.NetworkID(), protocolString), - middleware: p2p.NewMiddleware(params.ConcurrencyLimit), + protocolID: shrex.ProtocolID(params.NetworkID(), protocolString), + middleware: shrex.NewMiddleware(params.ConcurrencyLimit), } ctx, cancel := context.WithCancel(context.Background()) srv.cancel = cancel - handler := srv.streamHandler(ctx) - withRateLimit := srv.middleware.RateLimitHandler(handler) - withRecovery := p2p.RecoveryMiddleware(withRateLimit) - srv.handler = withRecovery + srv.handler = srv.middleware.RateLimitHandler(srv.streamHandler(ctx)) return srv, nil } @@ -95,7 +94,7 @@ func (srv *Server) SetHandler(handler network.StreamHandler) { func (srv *Server) observeRateLimitedRequests() { numRateLimited := srv.middleware.DrainCounter() if numRateLimited > 0 { - srv.metrics.ObserveRequests(context.Background(), numRateLimited, p2p.StatusRateLimited) + srv.metrics.ObserveRequests(context.Background(), numRateLimited, shrex.StatusRateLimited) } } @@ -104,26 +103,28 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre logger.Debug("handling nd request") srv.observeRateLimitedRequests() - req, err := srv.readRequest(logger, stream) + ndid, err := srv.readRequest(logger, stream) if err != nil { logger.Warnw("read request", "err", err) - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusBadRequest) + srv.metrics.ObserveRequests(ctx, 1, shrex.StatusBadRequest) return err } - logger = logger.With("namespace", share.Namespace(req.Namespace).String(), - "hash", share.DataHash(req.RootHash).String()) + logger = logger.With( + "namespace", ndid.DataNamespace.String(), + "height", ndid.Height, + ) ctx, cancel := context.WithTimeout(ctx, srv.params.HandleRequestTimeout) defer cancel() - shares, status, err := srv.getNamespaceData(ctx, req.RootHash, req.Namespace) + shares, status, err := srv.getNamespaceData(ctx, ndid) if err != nil { // server should respond with status regardless if there was an error getting data sendErr := srv.respondStatus(ctx, logger, stream, status) if sendErr != nil { logger.Errorw("sending response", "err", sendErr) - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSendRespErr) + srv.metrics.ObserveRequests(ctx, 1, shrex.StatusSendRespErr) } logger.Errorw("handling request", "err", err) return errors.Join(err, sendErr) @@ -132,14 +133,14 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre err = srv.respondStatus(ctx, logger, stream, status) if err != nil { logger.Errorw("sending response", "err", err) - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSendRespErr) + srv.metrics.ObserveRequests(ctx, 1, shrex.StatusSendRespErr) return err } err = srv.sendNamespacedShares(shares, stream) if err != nil { logger.Errorw("send nd data", "err", err) - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSendRespErr) + srv.metrics.ObserveRequests(ctx, 1, shrex.StatusSendRespErr) return err } return nil @@ -148,16 +149,20 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre func (srv *Server) readRequest( logger *zap.SugaredLogger, stream network.Stream, -) (*pb.GetSharesByNamespaceRequest, error) { +) (shwap.NamespaceDataID, error) { err := stream.SetReadDeadline(time.Now().Add(srv.params.ServerReadTimeout)) if err != nil { logger.Debugw("setting read deadline", "err", err) } - var req pb.GetSharesByNamespaceRequest - _, err = serde.Read(stream, &req) + req := make([]byte, shwap.NamespaceDataIDSize) + _, err = io.ReadFull(stream, req) if err != nil { - return nil, fmt.Errorf("reading request: %w", err) + return shwap.NamespaceDataID{}, fmt.Errorf("reading request: %w", err) + } + id, err := shwap.NamespaceDataIDFromBinary(req) + if err != nil { + return shwap.NamespaceDataID{}, fmt.Errorf("decoding request: %w", err) } logger.Debugw("new request") @@ -166,30 +171,25 @@ func (srv *Server) readRequest( logger.Debugw("closing read side of the stream", "err", err) } - err = validateRequest(req) - if err != nil { - return nil, fmt.Errorf("invalid request: %w", err) - } - return &req, nil + return id, nil } -func (srv *Server) getNamespaceData(ctx context.Context, - hash share.DataHash, namespace share.Namespace, -) (share.NamespacedShares, pb.StatusCode, error) { - roots, err := srv.store.GetDAH(ctx, hash) +func (srv *Server) getNamespaceData(ctx context.Context, id shwap.NamespaceDataID) (shwap.NamespacedData, pb.StatusCode, error) { + file, err := srv.store.GetByHeight(ctx, id.Height) + if errors.Is(err, store.ErrNotFound) { + return nil, pb.StatusCode_NOT_FOUND, nil + } if err != nil { - if errors.Is(err, eds.ErrNotFound) { - return nil, pb.StatusCode_NOT_FOUND, nil - } return nil, pb.StatusCode_INTERNAL, fmt.Errorf("retrieving DAH: %w", err) } + defer utils.CloseAndLog(log, "file", file) - shares, err := eds.RetrieveNamespaceFromStore(ctx, srv.store, roots, namespace) + nd, err := eds.NamespacedData(ctx, file, id.DataNamespace) if err != nil { - return nil, pb.StatusCode_INTERNAL, fmt.Errorf("retrieving shares: %w", err) + return nil, pb.StatusCode_INTERNAL, fmt.Errorf("getting nd: %w", err) } - return shares, pb.StatusCode_OK, nil + return nd, pb.StatusCode_OK, nil } func (srv *Server) respondStatus( @@ -214,8 +214,8 @@ func (srv *Server) respondStatus( } // sendNamespacedShares encodes shares into proto messages and sends it to client -func (srv *Server) sendNamespacedShares(shares share.NamespacedShares, stream network.Stream) error { - for _, row := range shares { +func (srv *Server) sendNamespacedShares(data shwap.NamespacedData, stream network.Stream) error { + for _, row := range data { row := &pb.NamespaceRowResponse{ Shares: row.Shares, Proof: &nmt_pb.Proof{ @@ -237,21 +237,10 @@ func (srv *Server) sendNamespacedShares(shares share.NamespacedShares, stream ne func (srv *Server) observeStatus(ctx context.Context, status pb.StatusCode) { switch { case status == pb.StatusCode_OK: - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) + srv.metrics.ObserveRequests(ctx, 1, shrex.StatusSuccess) case status == pb.StatusCode_NOT_FOUND: - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) + srv.metrics.ObserveRequests(ctx, 1, shrex.StatusNotFound) case status == pb.StatusCode_INTERNAL: - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusInternalErr) + srv.metrics.ObserveRequests(ctx, 1, shrex.StatusInternalErr) } } - -// validateRequest checks correctness of the request -func validateRequest(req pb.GetSharesByNamespaceRequest) error { - if err := share.Namespace(req.Namespace).ValidateForData(); err != nil { - return err - } - if len(req.RootHash) != sha256.Size { - return fmt.Errorf("incorrect root hash length: %v", len(req.RootHash)) - } - return nil -} diff --git a/share/p2p/shrexsub/doc.go b/share/shwap/p2p/shrex/shrexsub/doc.go similarity index 100% rename from share/p2p/shrexsub/doc.go rename to share/shwap/p2p/shrex/shrexsub/doc.go diff --git a/share/p2p/shrexsub/pb/notification.pb.go b/share/shwap/p2p/shrex/shrexsub/pb/notification.pb.go similarity index 85% rename from share/p2p/shrexsub/pb/notification.pb.go rename to share/shwap/p2p/shrex/shrexsub/pb/notification.pb.go index e154dc62b7..c7cddbba5c 100644 --- a/share/p2p/shrexsub/pb/notification.pb.go +++ b/share/shwap/p2p/shrex/shrexsub/pb/notification.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: share/p2p/shrexsub/pb/notification.proto +// source: share/shwap/p2p/shrex/shrexsub/pb/notification.proto package share_p2p_shrex_sub @@ -31,7 +31,7 @@ func (m *RecentEDSNotification) Reset() { *m = RecentEDSNotification{} } func (m *RecentEDSNotification) String() string { return proto.CompactTextString(m) } func (*RecentEDSNotification) ProtoMessage() {} func (*RecentEDSNotification) Descriptor() ([]byte, []int) { - return fileDescriptor_1a6ade914b560e62, []int{0} + return fileDescriptor_c16b670e7e556100, []int{0} } func (m *RecentEDSNotification) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -79,22 +79,23 @@ func init() { } func init() { - proto.RegisterFile("share/p2p/shrexsub/pb/notification.proto", fileDescriptor_1a6ade914b560e62) + proto.RegisterFile("share/shwap/p2p/shrex/shrexsub/pb/notification.proto", fileDescriptor_c16b670e7e556100) } -var fileDescriptor_1a6ade914b560e62 = []byte{ - // 176 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x28, 0xce, 0x48, 0x2c, - 0x4a, 0xd5, 0x2f, 0x30, 0x2a, 0xd0, 0x2f, 0xce, 0x28, 0x4a, 0xad, 0x28, 0x2e, 0x4d, 0xd2, 0x2f, - 0x48, 0xd2, 0xcf, 0xcb, 0x2f, 0xc9, 0x4c, 0xcb, 0x4c, 0x4e, 0x2c, 0xc9, 0xcc, 0xcf, 0xd3, 0x2b, - 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x06, 0xab, 0xd4, 0x2b, 0x30, 0x2a, 0xd0, 0x03, 0xab, 0xd4, - 0x2b, 0x2e, 0x4d, 0x52, 0xf2, 0xe1, 0x12, 0x0d, 0x4a, 0x4d, 0x4e, 0xcd, 0x2b, 0x71, 0x75, 0x09, - 0xf6, 0x43, 0xd2, 0x23, 0x24, 0xc6, 0xc5, 0x96, 0x91, 0x9a, 0x99, 0x9e, 0x51, 0x22, 0xc1, 0xa8, - 0xc0, 0xa8, 0xc1, 0x12, 0x04, 0xe5, 0x09, 0x49, 0x73, 0x71, 0xa6, 0x24, 0x96, 0x24, 0xc6, 0x67, - 0x24, 0x16, 0x67, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x04, 0x71, 0x80, 0x04, 0x3c, 0x12, 0x8b, - 0x33, 0x9c, 0x24, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, - 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e, 0x21, 0x89, 0x0d, 0xec, - 0x06, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x99, 0x16, 0xea, 0xc6, 0xaf, 0x00, 0x00, 0x00, +var fileDescriptor_c16b670e7e556100 = []byte{ + // 183 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x29, 0xce, 0x48, 0x2c, + 0x4a, 0xd5, 0x2f, 0xce, 0x28, 0x4f, 0x2c, 0xd0, 0x2f, 0x30, 0x2a, 0xd0, 0x2f, 0xce, 0x28, 0x4a, + 0xad, 0x80, 0x90, 0xc5, 0xa5, 0x49, 0xfa, 0x05, 0x49, 0xfa, 0x79, 0xf9, 0x25, 0x99, 0x69, 0x99, + 0xc9, 0x89, 0x25, 0x99, 0xf9, 0x79, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0xc2, 0x60, 0x5d, + 0x7a, 0x05, 0x46, 0x05, 0x7a, 0x60, 0x95, 0x7a, 0xc5, 0xa5, 0x49, 0x4a, 0x3e, 0x5c, 0xa2, 0x41, + 0xa9, 0xc9, 0xa9, 0x79, 0x25, 0xae, 0x2e, 0xc1, 0x7e, 0x48, 0x7a, 0x84, 0xc4, 0xb8, 0xd8, 0x32, + 0x52, 0x33, 0xd3, 0x33, 0x4a, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0xa0, 0x3c, 0x21, 0x69, + 0x2e, 0xce, 0x94, 0xc4, 0x92, 0xc4, 0xf8, 0x8c, 0xc4, 0xe2, 0x0c, 0x09, 0x26, 0x05, 0x46, 0x0d, + 0x9e, 0x20, 0x0e, 0x90, 0x80, 0x47, 0x62, 0x71, 0x86, 0x93, 0xc4, 0x89, 0x47, 0x72, 0x8c, 0x17, + 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c, + 0x37, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0xdd, 0x60, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xc0, + 0x55, 0x8a, 0x06, 0xbb, 0x00, 0x00, 0x00, } func (m *RecentEDSNotification) Marshal() (dAtA []byte, err error) { diff --git a/share/p2p/shrexsub/pb/notification.proto b/share/shwap/p2p/shrex/shrexsub/pb/notification.proto similarity index 100% rename from share/p2p/shrexsub/pb/notification.proto rename to share/shwap/p2p/shrex/shrexsub/pb/notification.proto diff --git a/share/p2p/shrexsub/pubsub.go b/share/shwap/p2p/shrex/shrexsub/pubsub.go similarity index 96% rename from share/p2p/shrexsub/pubsub.go rename to share/shwap/p2p/shrex/shrexsub/pubsub.go index 9254dbdb96..d1861cfe12 100644 --- a/share/p2p/shrexsub/pubsub.go +++ b/share/shwap/p2p/shrex/shrexsub/pubsub.go @@ -10,14 +10,14 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/celestiaorg/celestia-node/share" - pb "github.com/celestiaorg/celestia-node/share/p2p/shrexsub/pb" + pb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub/pb" ) var log = logging.Logger("shrex-sub") // pubsubTopic hardcodes the name of the EDS floodsub topic with the provided networkID. func pubsubTopicID(networkID string) string { - return fmt.Sprintf("%s/eds-sub/v0.1.0", networkID) + return fmt.Sprintf("%s/eds-sub/v0.2.0", networkID) } // ValidatorFn is an injectable func and governs EDS notification msg validity. diff --git a/share/p2p/shrexsub/pubsub_test.go b/share/shwap/p2p/shrex/shrexsub/pubsub_test.go similarity index 97% rename from share/p2p/shrexsub/pubsub_test.go rename to share/shwap/p2p/shrex/shrexsub/pubsub_test.go index 788d4ca73c..78c2141852 100644 --- a/share/p2p/shrexsub/pubsub_test.go +++ b/share/shwap/p2p/shrex/shrexsub/pubsub_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/rand" - pb "github.com/celestiaorg/celestia-node/share/p2p/shrexsub/pb" + pb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub/pb" ) func TestPubSub(t *testing.T) { diff --git a/share/p2p/shrexsub/subscription.go b/share/shwap/p2p/shrex/shrexsub/subscription.go similarity index 94% rename from share/p2p/shrexsub/subscription.go rename to share/shwap/p2p/shrex/shrexsub/subscription.go index 32a3e65e51..5021f090c2 100644 --- a/share/p2p/shrexsub/subscription.go +++ b/share/shwap/p2p/shrex/shrexsub/subscription.go @@ -6,7 +6,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" - pb "github.com/celestiaorg/celestia-node/share/p2p/shrexsub/pb" + pb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub/pb" ) // Subscription is a wrapper over pubsub.Subscription that handles