From ac5d7a7e2f9b8234cdc28c468f5241a39967eb18 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 10 Dec 2024 17:24:07 +0100 Subject: [PATCH 01/11] refactor(share/availability/light | share/availability/full): Availability implementations are aware of sampling window, removed from DASer (#3957) --- core/eds.go | 3 +- core/exchange.go | 6 ++- core/exchange_test.go | 49 +++++++++++++++++ core/listener.go | 4 +- core/option.go | 8 +++ das/daser.go | 14 ----- das/daser_test.go | 41 --------------- das/options.go | 13 ----- das/worker.go | 5 +- nodebuilder/das/constructors.go | 4 -- nodebuilder/pruner/module.go | 13 ++--- share/availability/full/availability.go | 28 ++++++++-- share/availability/full/availability_test.go | 52 +++++++++++++++++++ share/availability/full/options.go | 25 +++++++++ share/availability/light/availability.go | 9 ++++ share/availability/light/availability_test.go | 1 + share/availability/light/options.go | 2 +- share/availability/window.go | 3 ++ 18 files changed, 192 insertions(+), 88 deletions(-) create mode 100644 share/availability/full/options.go diff --git a/core/eds.go b/core/eds.go index 5e95f30103..2e8ce7ea19 100644 --- a/core/eds.go +++ b/core/eds.go @@ -62,8 +62,9 @@ func storeEDS( eds *rsmt2d.ExtendedDataSquare, store *store.Store, window time.Duration, + archival bool, ) error { - if !availability.IsWithinWindow(eh.Time(), window) { + if !archival && !availability.IsWithinWindow(eh.Time(), window) { log.Debugw("skipping storage of historic block", "height", eh.Height()) return nil } diff --git a/core/exchange.go b/core/exchange.go index a813955968..372906ff85 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -23,6 +23,7 @@ type Exchange struct { construct header.ConstructFn availabilityWindow time.Duration + archival bool metrics *exchangeMetrics } @@ -54,6 +55,7 @@ func NewExchange( store: store, construct: construct, availabilityWindow: p.availabilityWindow, + archival: p.archival, metrics: metrics, }, nil } @@ -147,7 +149,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende &block.Height, hash, eh.Hash()) } - err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow) + err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival) if err != nil { return nil, err } @@ -187,7 +189,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)) } - err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow) + err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival) if err != nil { return nil, err } diff --git a/core/exchange_test.go b/core/exchange_test.go index f7e69be8a4..a2187ed7c8 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -111,6 +111,55 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { } } +// TestExchange_StoreHistoricIfArchival makes sure blocks are stored past +// sampling window if archival is enabled +func TestExchange_StoreHistoricIfArchival(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + cfg := DefaultTestConfig() + fetcher, cctx := createCoreFetcher(t, cfg) + + generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) + + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) + + ce, err := NewExchange( + fetcher, + store, + header.MakeExtendedHeader, + WithAvailabilityWindow(time.Nanosecond), // all blocks will be "historic" + WithArchivalMode(), // make sure to store them anyway + ) + require.NoError(t, err) + + // initialize store with genesis block + genHeight := int64(1) + genBlock, err := fetcher.GetBlock(ctx, &genHeight) + require.NoError(t, err) + genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) + require.NoError(t, err) + + headers, err := ce.GetRangeByHeight(ctx, genHeader, 30) + require.NoError(t, err) + + // ensure all "historic" EDSs were stored + for _, h := range headers { + has, err := store.HasByHeight(ctx, h.Height()) + require.NoError(t, err) + assert.True(t, has) + + // empty EDSs are expected to exist in the store, so we skip them + if h.DAH.Equals(share.EmptyEDSRoots()) { + continue + } + has, err = store.HasByHash(ctx, h.DAH.Hash()) + require.NoError(t, err) + assert.True(t, has) + } +} + func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) { cctx := StartTestNodeWithConfig(t, cfg) // wait for height 2 in order to be able to start submitting txs (this prevents diff --git a/core/listener.go b/core/listener.go index 1d7dc1b061..d403421175 100644 --- a/core/listener.go +++ b/core/listener.go @@ -38,6 +38,7 @@ type Listener struct { construct header.ConstructFn store *store.Store availabilityWindow time.Duration + archival bool headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader] hashBroadcaster shrexsub.BroadcastFn @@ -83,6 +84,7 @@ func NewListener( construct: construct, store: store, availabilityWindow: p.availabilityWindow, + archival: p.archival, listenerTimeout: 5 * blocktime, metrics: metrics, chainID: p.chainID, @@ -237,7 +239,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS panic(fmt.Errorf("making extended header: %w", err)) } - err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow) + err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow, cl.archival) if err != nil { return fmt.Errorf("storing EDS: %w", err) } diff --git a/core/option.go b/core/option.go index 3e9b5a8e20..874246bd84 100644 --- a/core/option.go +++ b/core/option.go @@ -12,11 +12,13 @@ type params struct { metrics bool chainID string availabilityWindow time.Duration + archival bool } func defaultParams() params { return params{ availabilityWindow: time.Duration(0), + archival: false, } } @@ -39,3 +41,9 @@ func WithAvailabilityWindow(window time.Duration) Option { p.availabilityWindow = window } } + +func WithArchivalMode() Option { + return func(p *params) { + p.archival = true + } +} diff --git a/das/daser.go b/das/daser.go index fa41bd56ad..d255d4e293 100644 --- a/das/daser.go +++ b/das/daser.go @@ -14,18 +14,12 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" ) var log = logging.Logger("das") -// errOutsideSamplingWindow is an error used to inform -// the caller of Sample that the given header is outside -// the sampling window. -var errOutsideSamplingWindow = fmt.Errorf("skipping header outside of sampling window") - // DASer continuously validates availability of data committed to headers. type DASer struct { params Parameters @@ -160,14 +154,6 @@ func (d *DASer) Stop(ctx context.Context) error { } func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error { - // short-circuit if pruning is enabled and the header is outside the - // availability window - if !availability.IsWithinWindow(h.Time(), d.params.samplingWindow) { - log.Debugw("skipping header outside sampling window", "height", h.Height(), - "time", h.Time()) - return errOutsideSamplingWindow - } - err := d.da.SharesAvailable(ctx, h) if err != nil { var byzantineErr *byzantine.ErrByzantine diff --git a/das/daser_test.go b/das/daser_test.go index 2cdc43497a..ac1d3b6190 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -3,7 +3,6 @@ package das import ( "context" "fmt" - "strconv" "testing" "time" @@ -20,7 +19,6 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/availability/mocks" "github.com/celestiaorg/celestia-node/share/eds/edstest" ) @@ -243,45 +241,6 @@ func TestDASerSampleTimeout(t *testing.T) { } } -// TestDASer_SamplingWindow tests the sampling window determination -// for headers. -func TestDASer_SamplingWindow(t *testing.T) { - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - sub := new(headertest.Subscriber) - fserv := &fraudtest.DummyService[*header.ExtendedHeader]{} - getter := getterStub{} - avail := mocks.NewMockAvailability(gomock.NewController(t)) - - // create and start DASer - daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1), - WithSamplingWindow(time.Second)) - require.NoError(t, err) - - tests := []struct { - timestamp time.Time - withinWindow bool - }{ - {timestamp: time.Now().Add(-(time.Second * 5)), withinWindow: false}, - {timestamp: time.Now().Add(-(time.Millisecond * 800)), withinWindow: true}, - {timestamp: time.Now().Add(-(time.Hour)), withinWindow: false}, - {timestamp: time.Now().Add(-(time.Hour * 24 * 30)), withinWindow: false}, - {timestamp: time.Now(), withinWindow: true}, - } - - for i, tt := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - eh := headertest.RandExtendedHeader(t) - eh.RawHeader.Time = tt.timestamp - - assert.Equal( - t, - tt.withinWindow, - availability.IsWithinWindow(eh.Time(), daser.params.samplingWindow), - ) - }) - } -} - // createDASerSubcomponents takes numGetter (number of headers // to store in mockGetter) and numSub (number of headers to store // in the mock header.Subscriber), returning a newly instantiated diff --git a/das/options.go b/das/options.go index 6a665d5577..cd7444ac16 100644 --- a/das/options.go +++ b/das/options.go @@ -41,11 +41,6 @@ type Parameters struct { // divided between parallel workers. SampleTimeout should be adjusted proportionally to // ConcurrencyLimit. SampleTimeout time.Duration - - // samplingWindow determines the time window that headers should fall into - // in order to be sampled. If set to 0, the sampling window will include - // all headers. - samplingWindow time.Duration } // DefaultParameters returns the default configuration values for the daser parameters @@ -161,11 +156,3 @@ func WithSampleTimeout(sampleTimeout time.Duration) Option { d.params.SampleTimeout = sampleTimeout } } - -// WithSamplingWindow is a functional option to configure the DASer's -// `samplingWindow` parameter. -func WithSamplingWindow(samplingWindow time.Duration) Option { - return func(d *DASer) { - d.params.samplingWindow = samplingWindow - } -} diff --git a/das/worker.go b/das/worker.go index 0acd4b41a6..b9dff58445 100644 --- a/das/worker.go +++ b/das/worker.go @@ -10,6 +10,7 @@ import ( libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" ) @@ -83,7 +84,7 @@ func (w *worker) run(ctx context.Context, timeout time.Duration, resultCh chan<- // sampling worker will resume upon restart return } - if errors.Is(err, errOutsideSamplingWindow) { + if errors.Is(err, availability.ErrOutsideSamplingWindow) { skipped++ err = nil } @@ -119,7 +120,7 @@ func (w *worker) sample(ctx context.Context, timeout time.Duration, height uint6 defer cancel() err = w.sampleFn(ctx, h) - if errors.Is(err, errOutsideSamplingWindow) { + if errors.Is(err, availability.ErrOutsideSamplingWindow) { // if header is outside sampling window, do not log // or record it. return err diff --git a/nodebuilder/das/constructors.go b/nodebuilder/das/constructors.go index b9b7bdf100..db35c1a0c2 100644 --- a/nodebuilder/das/constructors.go +++ b/nodebuilder/das/constructors.go @@ -12,7 +12,6 @@ import ( "github.com/celestiaorg/celestia-node/das" "github.com/celestiaorg/celestia-node/header" modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" - modshare "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub" @@ -45,11 +44,8 @@ func newDASer( batching datastore.Batching, fraudServ fraud.Service[*header.ExtendedHeader], bFn shrexsub.BroadcastFn, - availWindow modshare.Window, options ...das.Option, ) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer, *header.ExtendedHeader], error) { - options = append(options, das.WithSamplingWindow(availWindow.Duration())) - ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...) if err != nil { return nil, nil, err diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index 537d9127f2..22b86ee6ec 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -15,6 +15,7 @@ import ( "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/pruner/full" "github.com/celestiaorg/celestia-node/share/availability" + fullavail "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" "github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery" ) @@ -59,6 +60,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents, prunerService, fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)), + fx.Supply([]fullavail.Option{}), ) } return fx.Module("prune", @@ -66,6 +68,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Invoke(func(ctx context.Context, ds datastore.Batching) error { return pruner.DetectPreviousRun(ctx, ds) }), + fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}), ) case node.Bridge: if cfg.EnableService { @@ -73,9 +76,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents, prunerService, fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)), - fx.Provide(func(window modshare.Window) []core.Option { - return []core.Option{core.WithAvailabilityWindow(window.Duration())} - }), + fx.Supply([]fullavail.Option{}), + fx.Supply([]core.Option{}), ) } return fx.Module("prune", @@ -83,9 +85,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Invoke(func(ctx context.Context, ds datastore.Batching) error { return pruner.DetectPreviousRun(ctx, ds) }), - fx.Provide(func() []core.Option { - return []core.Option{} - }), + fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}), + fx.Supply([]core.Option{core.WithArchivalMode()}), ) default: panic("unknown node type") diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index b95a648d5d..c7f7999366 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" logging "github.com/ipfs/go-log/v2" @@ -23,16 +24,27 @@ var log = logging.Logger("share/full") type ShareAvailability struct { store *store.Store getter shwap.Getter + + storageWindow time.Duration + archival bool } // NewShareAvailability creates a new full ShareAvailability. func NewShareAvailability( store *store.Store, getter shwap.Getter, + opts ...Option, ) *ShareAvailability { + p := defaultParams() + for _, opt := range opts { + opt(p) + } + return &ShareAvailability{ - store: store, - getter: getter, + store: store, + getter: getter, + storageWindow: availability.StorageWindow, + archival: p.archival, } } @@ -40,6 +52,16 @@ func NewShareAvailability( // enough Shares from the network. func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header.ExtendedHeader) error { dah := header.DAH + + if !fa.archival { + // do not sync blocks outside of sampling window if not archival + if !availability.IsWithinWindow(header.Time(), fa.storageWindow) { + log.Debugw("skipping availability check for block outside sampling"+ + " window", "height", header.Height(), "data hash", dah.String()) + return availability.ErrOutsideSamplingWindow + } + } + // if the data square is empty, we can safely link the header height in the store to an empty EDS. if share.DataHash(dah.Hash()).IsEmptyEDS() { err := fa.store.PutODSQ4(ctx, dah, header.Height(), share.EmptyEDS()) @@ -68,7 +90,7 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header } // archival nodes should not store Q4 outside the availability window. - if availability.IsWithinWindow(header.Time(), availability.StorageWindow) { + if availability.IsWithinWindow(header.Time(), fa.storageWindow) { err = fa.store.PutODSQ4(ctx, dah, header.Height(), eds) } else { err = fa.store.PutODS(ctx, dah, header.Height(), eds) diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index 95f8bda533..24435ee0d8 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -6,10 +6,12 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/eds/edstest" "github.com/celestiaorg/celestia-node/share/shwap" "github.com/celestiaorg/celestia-node/share/shwap/getters/mock" @@ -97,3 +99,53 @@ func TestSharesAvailable_ErrNotAvailable(t *testing.T) { require.ErrorIs(t, err, share.ErrNotAvailable) } } + +// TestSharesAvailable_OutsideSamplingWindow_NonArchival tests to make sure +// blocks are skipped that are outside sampling window. +func TestSharesAvailable_OutsideSamplingWindow_NonArchival(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + getter := mock.NewMockGetter(gomock.NewController(t)) + getter.EXPECT().GetEDS(gomock.Any(), gomock.Any()).Times(0) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) + + suite := headertest.NewTestSuite(t, 3, time.Nanosecond) + headers := suite.GenExtendedHeaders(10) + + avail := NewShareAvailability(store, getter) + avail.storageWindow = time.Nanosecond // make all headers outside sampling window + + for _, h := range headers { + err := avail.SharesAvailable(ctx, h) + assert.ErrorIs(t, err, availability.ErrOutsideSamplingWindow) + } +} + +// TestSharesAvailable_OutsideSamplingWindow_Archival tests to make sure +// blocks are still synced that are outside sampling window. +func TestSharesAvailable_OutsideSamplingWindow_Archival(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + getter := mock.NewMockGetter(gomock.NewController(t)) + store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) + require.NoError(t, err) + + eds := edstest.RandEDS(t, 4) + roots, err := share.NewAxisRoots(eds) + eh := headertest.RandExtendedHeaderWithRoot(t, roots) + require.NoError(t, err) + + getter.EXPECT().GetEDS(gomock.Any(), gomock.Any()).Times(1).Return(eds, nil) + + avail := NewShareAvailability(store, getter, WithArchivalMode()) + avail.storageWindow = time.Nanosecond // make all headers outside sampling window + + err = avail.SharesAvailable(ctx, eh) + require.NoError(t, err) + has, err := store.HasByHash(ctx, roots.Hash()) + require.NoError(t, err) + assert.True(t, has) +} diff --git a/share/availability/full/options.go b/share/availability/full/options.go new file mode 100644 index 0000000000..689a225a9a --- /dev/null +++ b/share/availability/full/options.go @@ -0,0 +1,25 @@ +package full + +type params struct { + archival bool +} + +// Option is a function that configures light availability Parameters +type Option func(*params) + +// DefaultParameters returns the default Parameters' configuration values +// for the light availability implementation +func defaultParams() *params { + return ¶ms{ + archival: false, + } +} + +// WithArchivalMode is a functional option to tell the full availability +// implementation that the node wants to sync *all* blocks, not just those +// within the sampling window. +func WithArchivalMode() Option { + return func(p *params) { + p.archival = true + } +} diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index d26aee281d..f3afbb26fd 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -17,6 +17,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/libs/utils" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/availability" "github.com/celestiaorg/celestia-node/share/ipld" "github.com/celestiaorg/celestia-node/share/shwap" "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap" @@ -37,6 +38,8 @@ type ShareAvailability struct { bs blockstore.Blockstore params Parameters + storageWindow time.Duration + activeHeights *utils.Sessions dsLk sync.RWMutex ds *autobatch.Datastore @@ -61,6 +64,7 @@ func NewShareAvailability( getter: getter, bs: bs, params: params, + storageWindow: availability.StorageWindow, activeHeights: utils.NewSessions(), ds: autoDS, } @@ -76,6 +80,11 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header return nil } + // short-circuit if outside sampling window + if !availability.IsWithinWindow(header.Time(), la.storageWindow) { + return availability.ErrOutsideSamplingWindow + } + // Prevent multiple sampling and pruning sessions for the same header height release, err := la.activeHeights.StartSession(ctx, header.Height()) if err != nil { diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index b1a9299bea..191c286938 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -517,6 +517,7 @@ func randEdsAndHeader(t *testing.T, size int) (*rsmt2d.ExtendedDataSquare, *head h := &header.ExtendedHeader{ RawHeader: header.RawHeader{ Height: int64(height), + Time: time.Now(), }, DAH: roots, } diff --git a/share/availability/light/options.go b/share/availability/light/options.go index 466b9fb030..3ed8038f51 100644 --- a/share/availability/light/options.go +++ b/share/availability/light/options.go @@ -4,7 +4,7 @@ import ( "fmt" ) -// SampleAmount specifies the minimum required amount of samples a light node must perform +// DefaultSampleAmount specifies the minimum required amount of samples a light node must perform // before declaring that a block is available var ( DefaultSampleAmount uint = 16 diff --git a/share/availability/window.go b/share/availability/window.go index 6227c54d64..cdf60c106b 100644 --- a/share/availability/window.go +++ b/share/availability/window.go @@ -1,6 +1,7 @@ package availability import ( + "errors" "os" "time" ) @@ -10,6 +11,8 @@ const ( StorageWindow = RequestWindow + time.Hour ) +var ErrOutsideSamplingWindow = errors.New("timestamp outside sampling window") + // IsWithinWindow checks whether the given timestamp is within the // given AvailabilityWindow. If the window is disabled (0), it returns true for // every timestamp. From 7af07bd08f4eed66fbc9eee47762fd2c3365c89f Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 11 Dec 2024 11:28:04 +0100 Subject: [PATCH 02/11] fix(api): add time.Duration and rsmt2d.Axis types (#3994) --- api/docgen/examples.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/docgen/examples.go b/api/docgen/examples.go index 8d56506d1b..640cfe8ab3 100644 --- a/api/docgen/examples.go +++ b/api/docgen/examples.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "reflect" + "time" "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" @@ -63,6 +64,7 @@ var ExampleValues = map[reflect.Type]interface{}{ reflect.TypeOf(float64(42)): float64(42), reflect.TypeOf(true): true, reflect.TypeOf([]byte{}): []byte("byte array"), + reflect.TypeOf(time.Duration(0)): time.Second, reflect.TypeOf(node.Full): node.Full, reflect.TypeOf(auth.Permission("admin")): auth.Permission("admin"), reflect.TypeOf(byzantine.BadEncoding): byzantine.BadEncoding, @@ -188,6 +190,8 @@ func init() { state.WithFeeGranterAddress("celestia1hakc56ax66ypjcmwj8w6hyr2c4g8cfs3wesguc"), ) addToExampleValues(txConfig) + + addToExampleValues(rsmt2d.Row) } func addToExampleValues(v interface{}) { From 01bb3e2db655fca370f530d2fb7177a39dd8acba Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Thu, 12 Dec 2024 06:53:29 +0200 Subject: [PATCH 03/11] fix(cmd/blob): fix namespace parsing in cli (#4003) --- nodebuilder/blob/cmd/blob.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/nodebuilder/blob/cmd/blob.go b/nodebuilder/blob/cmd/blob.go index 504533d2fc..17eec5bbf1 100644 --- a/nodebuilder/blob/cmd/blob.go +++ b/nodebuilder/blob/cmd/blob.go @@ -43,11 +43,11 @@ var getCmd = &cobra.Command{ Short: "Returns the blob for the given namespace by commitment at a particular height.\n" + "Note:\n* Both namespace and commitment input parameters are expected to be in their hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { - if !strings.HasPrefix(args[1], "0x") { - return fmt.Errorf("only hex namespace is supported") + if !strings.HasPrefix(args[0], "0x") { + args[0] = "0x" + args[0] } - if !strings.HasPrefix(args[2], "0x") { - return fmt.Errorf("only hex commitment is supported") + if !strings.HasPrefix(args[1], "0x") { + args[1] = "0x" + args[1] } return nil }, @@ -84,8 +84,11 @@ var getAllCmd = &cobra.Command{ Short: "Returns all blobs for the given namespace at a particular height.\n" + "Note:\n* Namespace input parameter is expected to be in its hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { + if !strings.HasPrefix(args[0], "0x") { + args[0] = "0x" + args[0] + } if !strings.HasPrefix(args[1], "0x") { - return fmt.Errorf("only hex namespace is supported") + args[1] = "0x" + args[1] } return nil }, @@ -136,7 +139,10 @@ var submitCmd = &cobra.Command{ }, PreRunE: func(_ *cobra.Command, args []string) error { if !strings.HasPrefix(args[0], "0x") { - return fmt.Errorf("only hex namespace is supported") + args[0] = "0x" + args[0] + } + if !strings.HasPrefix(args[1], "0x") { + args[1] = "0x" + args[1] } return nil }, @@ -235,11 +241,11 @@ var getProofCmd = &cobra.Command{ Short: "Retrieves the blob in the given namespaces at the given height by commitment and returns its Proof.\n" + "Note:\n* Both namespace and commitment input parameters are expected to be in their hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { - if !strings.HasPrefix(args[1], "0x") { - return fmt.Errorf("only hex namespace is supported") + if !strings.HasPrefix(args[0], "0x") { + args[0] = "0x" + args[0] } - if !strings.HasPrefix(args[2], "0x") { - return fmt.Errorf("only hex commitment is supported") + if !strings.HasPrefix(args[1], "0x") { + args[1] = "0x" + args[1] } return nil }, From 7c586c7d41c87b23725812236072e8241c45305e Mon Sep 17 00:00:00 2001 From: Nguyen Nhu Viet Date: Fri, 13 Dec 2024 12:31:50 +0100 Subject: [PATCH 04/11] chore/make: add celestia-node make file for arabica testnet (#3992) Co-authored-by: Viacheslav --- Makefile | 3 ++ README.md | 37 ++++++++++++++++- celestia-node.mk | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 celestia-node.mk diff --git a/Makefile b/Makefile index def65cc137..8c1d06f0a8 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,9 @@ ifeq ($(SHORT),true) else INTEGRATION_RUN_LENGTH = endif + +include celestia-node.mk + ## help: Get more info on make commands. help: Makefile @echo " Choose a command run in "$(PROJECTNAME)":" diff --git a/README.md b/README.md index d4667e6fb0..6db1959599 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Continue reading [here](https://blog.celestia.org/celestia-mvp-release-data-avai - [API docs](#api-docs) - [Node types](#node-types) - [Run a node](#run-a-node) + - [Quick Start with Light Node on arabica](#quick-start-with-light-node-on-arabica) - [Environment variables](#environment-variables) - [Package-specific documentation](#package-specific-documentation) - [Code of Conduct](#code-of-conduct) @@ -31,7 +32,7 @@ Continue reading [here](https://blog.celestia.org/celestia-mvp-release-data-avai ## Minimum requirements | Requirement | Notes | -| ----------- |----------------| +| ----------- | -------------- | | Go version | 1.23 or higher | ## System Requirements @@ -79,6 +80,40 @@ celestia start Please refer to [this guide](https://docs.celestia.org/nodes/celestia-node/) for more information on running a node. +### Quick Start with Light Node on arabica + +View available commands and their usage: + +```sh +make node-help +``` + +Install celestia node and cel-key binaries: + +```sh +make node-install +``` + +Start a light node with automated setup: + +```sh +make light-arabica-up +``` + +This command: + +- Automatically checks wallet balance +- Requests funds from faucet if needed +- Sets node height to latest-1 for quick startup +- Initializes the node if running for the first time + +Options: + +```sh +make light-arabica-up COMMAND=again # Reset node state to latest height +make light-arabica-up CORE_IP= # Use custom core IP +``` + ## Environment variables | Variable | Explanation | Default value | Required | diff --git a/celestia-node.mk b/celestia-node.mk new file mode 100644 index 0000000000..2729acc13b --- /dev/null +++ b/celestia-node.mk @@ -0,0 +1,101 @@ +# Celestia Node Management Rules +# These rules can be included in the main Makefile + +.PHONY: install get-address check-and-fund reset-node light-up node-help + +node-help: + @echo "Celestia Light Node Management Commands:" + @echo "" + @echo "Available targets:" + @echo " node-install - Install celestia node and cel-key binaries" + @echo " get-address - Display the wallet address from cel-key" + @echo " check-and-fund - Check wallet balance and request funds if needed" + @echo " reset-node - Reset node state and update config with latest block height" + @echo " light-arabica-up - Start the Celestia light node" + @echo "" + @echo "Special usage:" + @echo " light-arabica-up options:" + @echo " COMMAND=again - Reset the node before starting" + @echo " CORE_IP= - Use custom IP instead of default validator" + @echo "" + @echo "Examples:" + @echo " make light-arabica-up" + @echo " make light-arabica-up COMMAND=again" + @echo " make light-arabica-up CORE_IP=custom.ip.address" + @echo " make light-arabica-up COMMAND=again CORE_IP=custom.ip.address" + +# Install celestia node and cel-key binaries +node-install: + make install + make cel-key + +# Get wallet address from cel-key +get-address: + @address=$$(cel-key list --node.type light --p2p.network arabica | grep "address: " | cut -d' ' -f3); \ + echo $$address + +# Check balance and fund if needed +check-and-fund: + @address=$$(cel-key list --node.type light --p2p.network arabica | grep "address: " | cut -d' ' -f3); \ + echo "Checking balance for address: $$address"; \ + balance=$$(curl -s "https://api.celestia-arabica-11.com/cosmos/bank/v1beta1/balances/$$address" | jq -r '.balances[] | select(.denom == "utia") | .amount // "0"'); \ + if [[ $$balance =~ ^[0-9]+$$ ]]; then \ + balance_tia=$$(echo "scale=6; $$balance/1000000" | bc); \ + echo "Current balance: $$balance_tia TIA"; \ + else \ + balance_tia=0; \ + fi; \ + if (( $$(echo "$$balance_tia < 1" | bc -l) )); then \ + echo "Balance too low. Requesting funds from faucet..."; \ + curl -X POST 'https://faucet.celestia-arabica-11.com/api/v1/faucet/give_me' \ + -H 'Content-Type: application/json' \ + -d '{"address": "'$$address'", "chainId": "arabica-11" }'; \ + echo "Waiting 10 seconds for transaction to process..."; \ + sleep 10; \ + fi + +# Reset node state and update config +reset-node: + @echo "Resetting node state..." + @celestia light unsafe-reset-store --p2p.network arabica + @echo "Getting latest block height and hash..." + @block_response=$$(curl -s https://rpc.celestia-arabica-11.com/block); \ + latest_block=$$(echo $$block_response | jq -r '.result.block.header.height'); \ + latest_hash=$$(echo $$block_response | jq -r '.result.block_id.hash'); \ + echo "Latest block height: $$latest_block"; \ + echo "Latest block hash: $$latest_hash"; \ + config_file="$$HOME/.celestia-light-arabica-11/config.toml"; \ + echo "Updating config.toml..."; \ + sed -i.bak -e "s/\(TrustedHash[[:space:]]*=[[:space:]]*\).*/\1\"$$latest_hash\"/" \ + -e "s/\(SampleFrom[[:space:]]*=[[:space:]]*\).*/\1$$latest_block/" \ + "$$config_file"; \ + echo "Configuration updated successfully" + +# Start the Celestia light node +# Usage: make light-arabica-up [COMMAND=again] [CORE_IP=custom_ip] +light-arabica-up: + @config_file="$$HOME/.celestia-light-arabica-11/config.toml"; \ + if [ "$(COMMAND)" = "again" ]; then \ + $(MAKE) reset-node; \ + fi; \ + if [ -e "$$config_file" ]; then \ + echo "Using config file: $$config_file"; \ + else \ + celestia light init --p2p.network arabica; \ + $(MAKE) reset-node; \ + $(MAKE) check-and-fund; \ + fi; \ + $(MAKE) check-and-fund; \ + if [ -n "$(CORE_IP)" ]; then \ + celestia light start \ + --core.ip $(CORE_IP) \ + --rpc.skip-auth \ + --rpc.addr 0.0.0.0 \ + --p2p.network arabica; \ + else \ + celestia light start \ + --core.ip validator-1.celestia-arabica-11.com \ + --rpc.skip-auth \ + --rpc.addr 0.0.0.0 \ + --p2p.network arabica; \ + fi From 1d555a3234e622d225260fc5a5bbad02979c0f40 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Fri, 13 Dec 2024 13:55:02 +0200 Subject: [PATCH 05/11] fix(sync/test): fix TestSyncStartStopLightWithBridge (#3990) --- nodebuilder/tests/sync_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nodebuilder/tests/sync_test.go b/nodebuilder/tests/sync_test.go index 1a63975574..48de6376a3 100644 --- a/nodebuilder/tests/sync_test.go +++ b/nodebuilder/tests/sync_test.go @@ -250,6 +250,7 @@ func TestSyncStartStopLightWithBridge(t *testing.T) { light = sw.NewLightNode() require.NoError(t, light.Start(ctx)) + lightClient = getAdminClient(ctx, light, t) // ensure when light node comes back up, it can sync the remainder of the chain it // missed while sleeping From af7a0b70c7961537f7da770a33074b3ff85215cc Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Fri, 13 Dec 2024 16:22:09 +0100 Subject: [PATCH 06/11] perf(shwap): cache Both Row sides (#4005) --- share/eds/rsmt2d.go | 7 +-- share/shwap/row.go | 97 +++++++++++++++++++++++++++-------------- share/shwap/row_test.go | 52 +++++++++------------- 3 files changed, 86 insertions(+), 70 deletions(-) diff --git a/share/eds/rsmt2d.go b/share/eds/rsmt2d.go index 6c244de700..1d806324b5 100644 --- a/share/eds/rsmt2d.go +++ b/share/eds/rsmt2d.go @@ -99,12 +99,7 @@ func (eds *Rsmt2D) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int // HalfRow constructs a new shwap.Row from an Extended Data Square based on the specified index and // side. func (eds *Rsmt2D) HalfRow(idx int, side shwap.RowSide) (shwap.Row, error) { - shares := eds.ExtendedDataSquare.Row(uint(idx)) - sh, err := libshare.FromBytes(shares) - if err != nil { - return shwap.Row{}, fmt.Errorf("while converting shares from bytes: %w", err) - } - return shwap.RowFromShares(sh, side), nil + return shwap.RowFromEDS(eds.ExtendedDataSquare, idx, side) } // RowNamespaceData returns data for the given namespace and row index. diff --git a/share/shwap/row.go b/share/shwap/row.go index ba2c0d0b82..7057680573 100644 --- a/share/shwap/row.go +++ b/share/shwap/row.go @@ -6,6 +6,7 @@ import ( "github.com/celestiaorg/celestia-app/v3/pkg/wrapper" libshare "github.com/celestiaorg/go-square/v2/share" + "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/shwap/pb" @@ -20,33 +21,42 @@ type RowSide int const ( Left RowSide = iota // Left side of the row. Right // Right side of the row. + Both // Both sides of the row. ) // Row represents a portion of a row in an EDS, either left or right half. type Row struct { - halfShares []libshare.Share // halfShares holds the shares of either the left or right half of a row. - side RowSide // side indicates whether the row half is left or right. + shares []libshare.Share // holds the shares of Left or Right or Both sides of the row. + side RowSide // side indicates which side the shares belong to. } // NewRow creates a new Row with the specified shares and side. -func NewRow(halfShares []libshare.Share, side RowSide) Row { +func NewRow(shares []libshare.Share, side RowSide) Row { return Row{ - halfShares: halfShares, - side: side, + shares: shares, + side: side, } } -// RowFromShares constructs a new Row from an Extended Data Square based on the specified index and -// side. -func RowFromShares(shares []libshare.Share, side RowSide) Row { - var halfShares []libshare.Share - if side == Right { - halfShares = shares[len(shares)/2:] // Take the right half of the shares. - } else { - halfShares = shares[:len(shares)/2] // Take the left half of the shares. +// RowFromEDS constructs a new Row from an EDS based on the specified row index and side. +func RowFromEDS(eds *rsmt2d.ExtendedDataSquare, rowIdx int, side RowSide) (Row, error) { + rowBytes := eds.Row(uint(rowIdx)) + shares, err := libshare.FromBytes(rowBytes) + if err != nil { + return Row{}, fmt.Errorf("while converting shares from bytes: %w", err) + } + + switch side { + case Both: + case Left: + shares = shares[:len(shares)/2] + case Right: + shares = shares[len(shares)/2:] + default: + return Row{}, fmt.Errorf("invalid RowSide: %d", side) } - return NewRow(halfShares, side) + return NewRow(shares, side), nil } // RowFromProto converts a protobuf Row to a Row structure. @@ -56,20 +66,22 @@ func RowFromProto(r *pb.Row) (Row, error) { return Row{}, err } return Row{ - halfShares: shrs, - side: sideFromProto(r.GetHalfSide()), + shares: shrs, + side: sideFromProto(r.GetHalfSide()), }, nil } // Shares reconstructs the complete row shares from the half provided, using RSMT2D for data // recovery if needed. -func (r Row) Shares() ([]libshare.Share, error) { - shares := make([]libshare.Share, len(r.halfShares)*2) - offset := 0 - if r.side == Right { - offset = len(r.halfShares) // Position the halfShares in the second half if it's the right side. +// It caches the reconstructed shares for future use and converts Row to Both side. +func (r *Row) Shares() ([]libshare.Share, error) { + if r.side == Both { + return r.shares, nil } - for i, share := range r.halfShares { + + shares := make([]libshare.Share, len(r.shares)*2) + offset := len(r.shares) * int(r.side) + for i, share := range r.shares { shares[i+offset] = share } @@ -77,33 +89,52 @@ func (r Row) Shares() ([]libshare.Share, error) { if err != nil { return nil, err } - return libshare.FromBytes(rowShares) + + r.shares, err = libshare.FromBytes(rowShares) + if err != nil { + return nil, err + } + + r.side = Both + return r.shares, nil } // ToProto converts the Row to its protobuf representation. func (r Row) ToProto() *pb.Row { + if r.side == Both { + // we don't need to send the whole row over the wire + // so if we have both sides, we can save bandwidth and send the left half only + return &pb.Row{ + SharesHalf: SharesToProto(r.shares[:len(r.shares)/2]), + HalfSide: pb.Row_LEFT, + } + } + return &pb.Row{ - SharesHalf: SharesToProto(r.halfShares), + SharesHalf: SharesToProto(r.shares), HalfSide: r.side.ToProto(), } } // IsEmpty reports whether the Row is empty, i.e. doesn't contain any shares. func (r Row) IsEmpty() bool { - return r.halfShares == nil + return len(r.shares) == 0 } // Verify checks if the row's shares match the expected number from the root data and validates // the side of the row. -func (r Row) Verify(roots *share.AxisRoots, idx int) error { - if len(r.halfShares) == 0 { - return fmt.Errorf("empty half row") +func (r *Row) Verify(roots *share.AxisRoots, idx int) error { + if len(r.shares) == 0 { + return fmt.Errorf("empt row") + } + expectedShares := len(roots.RowRoots) + if r.side != Both { + expectedShares /= 2 } - expectedShares := len(roots.RowRoots) / 2 - if len(r.halfShares) != expectedShares { - return fmt.Errorf("shares size doesn't match root size: %d != %d", len(r.halfShares), expectedShares) + if len(r.shares) != expectedShares { + return fmt.Errorf("shares size doesn't match root size: %d != %d", len(r.shares), expectedShares) } - if r.side != Left && r.side != Right { + if r.side != Left && r.side != Right && r.side != Both { return fmt.Errorf("invalid RowSide: %d", r.side) } @@ -115,7 +146,7 @@ func (r Row) Verify(roots *share.AxisRoots, idx int) error { // verifyInclusion verifies the integrity of the row's shares against the provided root hash for the // given row index. -func (r Row) verifyInclusion(roots *share.AxisRoots, idx int) error { +func (r *Row) verifyInclusion(roots *share.AxisRoots, idx int) error { shrs, err := r.Shares() if err != nil { return fmt.Errorf("while extending shares: %w", err) diff --git a/share/shwap/row_test.go b/share/shwap/row_test.go index 9249ea87f4..16bce3893b 100644 --- a/share/shwap/row_test.go +++ b/share/shwap/row_test.go @@ -11,28 +11,20 @@ import ( "github.com/celestiaorg/celestia-node/share/eds/edstest" ) -func TestRowFromShares(t *testing.T) { +func TestRowShares(t *testing.T) { const odsSize = 8 eds := edstest.RandEDS(t, odsSize) for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { - for _, side := range []RowSide{Left, Right} { - shrs := eds.Row(uint(rowIdx)) - shares, err := libshare.FromBytes(shrs) + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) require.NoError(t, err) - row := RowFromShares(shares, side) + require.Equal(t, side, row.side) + extended, err := row.Shares() require.NoError(t, err) - require.Equal(t, shares, extended) - - var half []libshare.Share - if side == Right { - half = shares[odsSize:] - } else { - half = shares[:odsSize] - } - require.Equal(t, half, row.halfShares) - require.Equal(t, side, row.side) + require.Len(t, extended, odsSize*2) + require.Equal(t, Both, row.side) } } } @@ -44,11 +36,9 @@ func TestRowValidate(t *testing.T) { require.NoError(t, err) for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { - for _, side := range []RowSide{Left, Right} { - shrs := eds.Row(uint(rowIdx)) - shares, err := libshare.FromBytes(shrs) + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) require.NoError(t, err) - row := RowFromShares(shares, side) err = row.Verify(root, rowIdx) require.NoError(t, err) @@ -65,10 +55,10 @@ func TestRowValidateNegativeCases(t *testing.T) { shrs := eds.Row(0) shares, err := libshare.FromBytes(shrs) require.NoError(t, err) - row := RowFromShares(shares, Left) + row := NewRow(shares, Left) // Test with incorrect side specification - invalidSideRow := Row{halfShares: row.halfShares, side: RowSide(999)} + invalidSideRow := Row{shares: row.shares, side: RowSide(999)} err = invalidSideRow.Verify(root, 0) require.Error(t, err, "should error on invalid row side") @@ -79,12 +69,12 @@ func TestRowValidateNegativeCases(t *testing.T) { require.NoError(t, err) incorrectShares[i] = *shr } - invalidRow := Row{halfShares: incorrectShares, side: Left} + invalidRow := Row{shares: incorrectShares, side: Left} err = invalidRow.Verify(root, 0) require.Error(t, err, "should error on incorrect number of shares") // Test with empty shares - emptyRow := Row{halfShares: []libshare.Share{}, side: Left} + emptyRow := Row{shares: []libshare.Share{}, side: Left} err = emptyRow.Verify(root, 0) require.Error(t, err, "should error on empty halfShares") @@ -99,16 +89,18 @@ func TestRowProtoEncoding(t *testing.T) { eds := edstest.RandEDS(t, odsSize) for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { - for _, side := range []RowSide{Left, Right} { - shrs := eds.Row(uint(rowIdx)) - shares, err := libshare.FromBytes(shrs) + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) require.NoError(t, err) - row := RowFromShares(shares, side) pb := row.ToProto() rowOut, err := RowFromProto(pb) require.NoError(t, err) - require.Equal(t, row, rowOut) + if side == Both { + require.NotEqual(t, row, rowOut) + } else { + require.Equal(t, row, rowOut) + } } } } @@ -120,10 +112,8 @@ func BenchmarkRowValidate(b *testing.B) { eds := edstest.RandEDS(b, odsSize) root, err := share.NewAxisRoots(eds) require.NoError(b, err) - shrs := eds.Row(0) - shares, err := libshare.FromBytes(shrs) + row, err := RowFromEDS(eds, 0, Left) require.NoError(b, err) - row := RowFromShares(shares, Left) b.ResetTimer() for i := 0; i < b.N; i++ { From 4d3d3c20330347a4fafc04ad538b4ae0832ed50b Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 23 Dec 2024 10:55:33 +0100 Subject: [PATCH 07/11] refactor(docgen): simplify init (#3997) --- api/docgen/examples.go | 196 ++++++++++++++++++++--------------------- api/docgen/openrpc.go | 2 +- 2 files changed, 96 insertions(+), 102 deletions(-) diff --git a/api/docgen/examples.go b/api/docgen/examples.go index 640cfe8ab3..572d5c3f04 100644 --- a/api/docgen/examples.go +++ b/api/docgen/examples.go @@ -35,171 +35,158 @@ import ( "github.com/celestiaorg/celestia-node/state" ) -//go:embed "exampledata/extendedHeader.json" -var exampleExtendedHeader string - -//go:embed "exampledata/samplingStats.json" -var exampleSamplingStats string - -//go:embed "exampledata/txResponse.json" -var exampleTxResponse string - -//go:embed "exampledata/resourceManagerStats.json" -var exampleResourceMngrStats string - -//go:embed "exampledata/blob.json" -var exampleBlob string - -//go:embed "exampledata/blobProof.json" -var exampleBlobProof string - -var ExampleValues = map[reflect.Type]interface{}{ - reflect.TypeOf(""): "string value", - reflect.TypeOf(uint64(42)): uint64(42), - reflect.TypeOf(uint32(42)): uint32(42), - reflect.TypeOf(int32(42)): int32(42), - reflect.TypeOf(int64(42)): int64(42), - reflect.TypeOf(42): 42, - reflect.TypeOf(byte(7)): byte(7), - reflect.TypeOf(float64(42)): float64(42), - reflect.TypeOf(true): true, - reflect.TypeOf([]byte{}): []byte("byte array"), - reflect.TypeOf(time.Duration(0)): time.Second, - reflect.TypeOf(node.Full): node.Full, - reflect.TypeOf(auth.Permission("admin")): auth.Permission("admin"), - reflect.TypeOf(byzantine.BadEncoding): byzantine.BadEncoding, - reflect.TypeOf((*fraud.Proof[*header.ExtendedHeader])(nil)).Elem(): byzantine.CreateBadEncodingProof( +var ( + //go:embed "exampledata/extendedHeader.json" + exampleExtendedHeader string + + //go:embed "exampledata/samplingStats.json" + exampleSamplingStats string + + //go:embed "exampledata/txResponse.json" + exampleTxResponse string + + //go:embed "exampledata/resourceManagerStats.json" + exampleResourceMngrStats string + + //go:embed "exampledata/blob.json" + exampleBlob string + + //go:embed "exampledata/blobProof.json" + exampleBlobProof string +) + +var exampleValues = map[reflect.Type]any{} + +func add(v any) { + typ := reflect.TypeOf(v) + exampleValues[typ] = v +} + +func init() { + add("string value") + add(uint64(42)) + add(uint32(42)) + add(int32(42)) + add(int64(42)) + add(42) + add(byte(7)) + add(float64(42)) + add(float64(42)) + add(true) + add([]byte("byte array")) + add(time.Second) + add(node.Full) + add(auth.Permission("admin")) + add(byzantine.BadEncoding) + + // TODO: this case requires more debugging, simple to leave it as it was. + exampleValues[reflect.TypeOf((*fraud.Proof[*header.ExtendedHeader])(nil)).Elem()] = byzantine.CreateBadEncodingProof( []byte("bad encoding proof"), 42, &byzantine.ErrByzantine{ Index: 0, - Axis: rsmt2d.Axis(0), Shares: []*byzantine.ShareWithProof{}, + Axis: rsmt2d.Axis(0), }, - ), - reflect.TypeOf((*error)(nil)).Elem(): errors.New("error"), - reflect.TypeOf(state.Balance{}): state.Balance{Amount: sdk.NewInt(42), Denom: "utia"}, -} + ) -func init() { - addToExampleValues(share.EmptyEDS()) - addr, err := sdk.AccAddressFromBech32("celestia1377k5an3f94v6wyaceu0cf4nq6gk2jtpc46g7h") - if err != nil { - panic(err) - } - addToExampleValues(addr) - ExampleValues[reflect.TypeOf((*sdk.Address)(nil)).Elem()] = addr + add(errors.New("error")) + add(state.Balance{Amount: sdk.NewInt(42), Denom: "utia"}) + add(share.EmptyEDS()) + add(rsmt2d.Row) + add(network.Connected) + add(network.ReachabilityPrivate) - valAddr, err := sdk.ValAddressFromBech32("celestiavaloper1q3v5cugc8cdpud87u4zwy0a74uxkk6u4q4gx4p") - if err != nil { - panic(err) - } - addToExampleValues(valAddr) + addr := must(sdk.AccAddressFromBech32("celestia1377k5an3f94v6wyaceu0cf4nq6gk2jtpc46g7h")) + add(addr) + add(state.Address{Address: addr}) + exampleValues[reflect.TypeOf((*sdk.Address)(nil)).Elem()] = addr - addToExampleValues(state.Address{Address: addr}) + valAddr := must(sdk.ValAddressFromBech32("celestiavaloper1q3v5cugc8cdpud87u4zwy0a74uxkk6u4q4gx4p")) + add(valAddr) var txResponse *state.TxResponse - err = json.Unmarshal([]byte(exampleTxResponse), &txResponse) + err := json.Unmarshal([]byte(exampleTxResponse), &txResponse) if err != nil { panic(err) } + add(txResponse) var samplingStats das.SamplingStats err = json.Unmarshal([]byte(exampleSamplingStats), &samplingStats) if err != nil { panic(err) } + add(samplingStats) var extendedHeader *header.ExtendedHeader err = json.Unmarshal([]byte(exampleExtendedHeader), &extendedHeader) if err != nil { panic(err) } + add(extendedHeader) var resourceMngrStats rcmgr.ResourceManagerStat err = json.Unmarshal([]byte(exampleResourceMngrStats), &resourceMngrStats) if err != nil { panic(err) } + add(resourceMngrStats) var exBlob *blob.Blob err = json.Unmarshal([]byte(exampleBlob), &exBlob) if err != nil { panic(err) } + add(exBlob) + add(exBlob.Blob) var blobProof *blob.Proof err = json.Unmarshal([]byte(exampleBlobProof), &blobProof) if err != nil { panic(err) } - - addToExampleValues(exBlob) - addToExampleValues(exBlob.Blob) - addToExampleValues(blobProof) - addToExampleValues(txResponse) - addToExampleValues(samplingStats) - addToExampleValues(extendedHeader) - addToExampleValues(resourceMngrStats) + add(blobProof) mathInt, _ := math.NewIntFromString("42") - addToExampleValues(mathInt) - - addToExampleValues(network.Connected) - addToExampleValues(network.ReachabilityPrivate) + add(mathInt) pID := protocol.ID("/celestia/mocha/ipfs/bitswap") - addToExampleValues(pID) + add(pID) peerID := peer.ID("12D3KooWPRb5h3g9MH7sx9qfbSQZG5cXv1a2Qs3o4aW5YmmzPq82") - addToExampleValues(peerID) + add(peerID) ma, _ := multiaddr.NewMultiaddr("/ip6/::1/udp/2121/quic-v1") addrInfo := peer.AddrInfo{ ID: peerID, Addrs: []multiaddr.Multiaddr{ma}, } - addToExampleValues(addrInfo) + add(addrInfo) - commitment, err := base64.StdEncoding.DecodeString("aHlbp+J9yub6hw/uhK6dP8hBLR2mFy78XNRRdLf2794=") - if err != nil { - panic(err) - } - addToExampleValues(blob.Commitment(commitment)) + commitment := must(base64.StdEncoding.DecodeString("aHlbp+J9yub6hw/uhK6dP8hBLR2mFy78XNRRdLf2794=")) + add(blob.Commitment(commitment)) // randomly generated namespace that's used in the blob example above // (AAAAAAAAAAAAAAAAAAAAAAAAAAAAAMJ/xGlNMdE=) - namespace, err := libshare.NewV0Namespace([]byte{0xc2, 0x7f, 0xc4, 0x69, 0x4d, 0x31, 0xd1}) - if err != nil { - panic(err) - } - addToExampleValues(namespace) + namespace := must(libshare.NewV0Namespace([]byte{0xc2, 0x7f, 0xc4, 0x69, 0x4d, 0x31, 0xd1})) + add(namespace) hashStr := "453D0BC3CB88A2ED6F2E06021383B22C72D25D7741AE51B4CAE1AD34D72A3F07" - hash, err := hex.DecodeString(hashStr) - if err != nil { - panic(err) - } - addToExampleValues(libhead.Hash(hash)) + hash := must(hex.DecodeString(hashStr)) + add(libhead.Hash(hash)) - txConfig := state.NewTxConfig( + add(state.NewTxConfig( state.WithGasPrice(0.002), state.WithGas(142225), state.WithKeyName("my_celes_key"), state.WithSignerAddress("celestia1pjcmwj8w6hyr2c4wehakc5g8cfs36aysgucx66"), state.WithFeeGranterAddress("celestia1hakc56ax66ypjcmwj8w6hyr2c4g8cfs3wesguc"), - ) - addToExampleValues(txConfig) - - addToExampleValues(rsmt2d.Row) -} - -func addToExampleValues(v interface{}) { - ExampleValues[reflect.TypeOf(v)] = v + )) } -func ExampleValue(t, parent reflect.Type) (interface{}, error) { - v, ok := ExampleValues[t] +func exampleValue(t, parent reflect.Type) (any, error) { + v, ok := exampleValues[t] if ok { return v, nil } @@ -207,26 +194,26 @@ func ExampleValue(t, parent reflect.Type) (interface{}, error) { switch t.Kind() { case reflect.Slice: out := reflect.New(t).Elem() - val, err := ExampleValue(t.Elem(), t) + val, err := exampleValue(t.Elem(), t) if err != nil { return nil, err } out = reflect.Append(out, reflect.ValueOf(val)) return out.Interface(), nil case reflect.Chan: - return ExampleValue(t.Elem(), nil) + return exampleValue(t.Elem(), nil) case reflect.Struct: es, err := exampleStruct(t, parent) if err != nil { return nil, err } v := reflect.ValueOf(es).Elem().Interface() - ExampleValues[t] = v + exampleValues[t] = v return v, nil case reflect.Array: out := reflect.New(t).Elem() for i := 0; i < t.Len(); i++ { - val, err := ExampleValue(t.Elem(), t) + val, err := exampleValue(t.Elem(), t) if err != nil { return nil, err } @@ -248,7 +235,7 @@ func ExampleValue(t, parent reflect.Type) (interface{}, error) { return nil, fmt.Errorf("failed to retrieve example value for type: %s on parent '%s')", t, parent) } -func exampleStruct(t, parent reflect.Type) (interface{}, error) { +func exampleStruct(t, parent reflect.Type) (any, error) { ns := reflect.New(t) for i := 0; i < t.NumField(); i++ { f := t.Field(i) @@ -256,7 +243,7 @@ func exampleStruct(t, parent reflect.Type) (interface{}, error) { continue } if cases.Title(language.Und, cases.NoLower).String(f.Name) == f.Name { - val, err := ExampleValue(f.Type, t) + val, err := exampleValue(f.Type, t) if err != nil { return nil, err } @@ -266,3 +253,10 @@ func exampleStruct(t, parent reflect.Type) (interface{}, error) { return ns.Interface(), nil } + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/api/docgen/openrpc.go b/api/docgen/openrpc.go index a5e52e7ee1..86d901372f 100644 --- a/api/docgen/openrpc.go +++ b/api/docgen/openrpc.go @@ -185,7 +185,7 @@ func NewOpenRPCDocument(comments, permissions Comments) *go_openrpc_reflect.Docu } appReflector.FnSchemaExamples = func(ty reflect.Type) (examples *meta_schema.Examples, err error) { - v, err := ExampleValue(ty, ty) // This isn't ideal, but seems to work well enough. + v, err := exampleValue(ty, ty) // This isn't ideal, but seems to work well enough. if err != nil { fmt.Println(err) } From 87ece3a84e2ceb301edff6c4d0c8647c006ee28c Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:07:28 +0100 Subject: [PATCH 08/11] fix(nodebuilder/da): remove random print (#4029) --- nodebuilder/da/service.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/nodebuilder/da/service.go b/nodebuilder/da/service.go index aded1f0ea9..c7e403fbff 100644 --- a/nodebuilder/da/service.go +++ b/nodebuilder/da/service.go @@ -4,7 +4,6 @@ import ( "context" "encoding/binary" "encoding/json" - "fmt" "strings" logging "github.com/ipfs/go-log/v2" @@ -256,7 +255,6 @@ func (s *Service) Validate( // invalid proof") but analysis of the code in celestia-node implies this should never happen - // maybe it's caused by openrpc? there is no way of gently handling errors here, but returned // value is fine for us - fmt.Println("proof", proofs[i] == nil, "commitment", commitment == nil) isIncluded, _ := s.blobServ.Included(ctx, height, ns, proofs[i], commitment) included[i] = isIncluded } From c0055c51dad43a80927c0f107753b3c0ed753238 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Thu, 9 Jan 2025 17:14:33 +0200 Subject: [PATCH 09/11] feat(modshare): implement GetRow (#4002) Co-authored-by: Hlib Kanunnikov --- nodebuilder/share/mocks/api.go | 15 ++++++ nodebuilder/share/share.go | 19 +++++++ share/availability/light/availability_test.go | 6 ++- share/shwap/getter.go | 2 + share/shwap/getters/cascade.go | 12 +++++ share/shwap/getters/mock/getter.go | 15 ++++++ share/shwap/getters/testing.go | 17 ++++++ share/shwap/p2p/bitswap/getter.go | 26 +++++++++ share/shwap/p2p/shrex/shrex_getter/shrex.go | 4 ++ share/shwap/row.go | 54 +++++++++++++++++++ share/shwap/row_test.go | 28 ++++++++++ store/getter.go | 15 ++++++ store/getter_test.go | 20 +++++++ 13 files changed, 232 insertions(+), 1 deletion(-) diff --git a/nodebuilder/share/mocks/api.go b/nodebuilder/share/mocks/api.go index 7fde2338cc..d392f39f5b 100644 --- a/nodebuilder/share/mocks/api.go +++ b/nodebuilder/share/mocks/api.go @@ -84,6 +84,21 @@ func (mr *MockModuleMockRecorder) GetRange(arg0, arg1, arg2, arg3 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRange", reflect.TypeOf((*MockModule)(nil).GetRange), arg0, arg1, arg2, arg3) } +// GetRow mocks base method. +func (m *MockModule) GetRow(arg0 context.Context, arg1 uint64, arg2 int) (shwap.Row, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRow", arg0, arg1, arg2) + ret0, _ := ret[0].(shwap.Row) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetRow indicates an expected call of GetRow. +func (mr *MockModuleMockRecorder) GetRow(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRow", reflect.TypeOf((*MockModule)(nil).GetRow), arg0, arg1, arg2) +} + // GetSamples mocks base method. func (m *MockModule) GetSamples(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 []shwap.SampleCoords) ([]shwap.Sample, error) { m.ctrl.T.Helper() diff --git a/nodebuilder/share/share.go b/nodebuilder/share/share.go index 783018fe47..46df1e2379 100644 --- a/nodebuilder/share/share.go +++ b/nodebuilder/share/share.go @@ -50,6 +50,8 @@ type Module interface { GetSamples(ctx context.Context, header *header.ExtendedHeader, indices []shwap.SampleCoords) ([]shwap.Sample, error) // GetEDS gets the full EDS identified by the given extended header. GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedDataSquare, error) + // GetRow gets all shares from specified row. + GetRow(context.Context, uint64, int) (shwap.Row, error) // GetNamespaceData gets all shares from an EDS within the given namespace. // Shares are returned in a row-by-row order if the namespace spans multiple rows. GetNamespaceData( @@ -77,6 +79,11 @@ type API struct { ctx context.Context, height uint64, ) (*rsmt2d.ExtendedDataSquare, error) `perm:"read"` + GetRow func( + context.Context, + uint64, + int, + ) (shwap.Row, error) `perm:"read"` GetNamespaceData func( ctx context.Context, height uint64, @@ -108,6 +115,10 @@ func (api *API) GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedData return api.Internal.GetEDS(ctx, height) } +func (api *API) GetRow(ctx context.Context, height uint64, rowIdx int) (shwap.Row, error) { + return api.Internal.GetRow(ctx, height, rowIdx) +} + func (api *API) GetRange(ctx context.Context, height uint64, start, end int) (*GetRangeResult, error) { return api.Internal.GetRange(ctx, height, start, end) } @@ -196,3 +207,11 @@ func (m module) GetNamespaceData( } return m.getter.GetNamespaceData(ctx, header, namespace) } + +func (m module) GetRow(ctx context.Context, height uint64, rowIdx int) (shwap.Row, error) { + header, err := m.hs.GetByHeight(ctx, height) + if err != nil { + return shwap.Row{}, err + } + return m.getter.GetRow(ctx, header, rowIdx) +} diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 191c286938..df441c46a1 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -290,7 +290,7 @@ func (g successGetter) checkOnce(t *testing.T) { } } -func (g successGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader, +func (g successGetter) GetSamples(_ context.Context, _ *header.ExtendedHeader, indices []shwap.SampleCoords, ) ([]shwap.Sample, error) { g.Lock() @@ -305,6 +305,10 @@ func (g successGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader, return smpls, nil } +func (g successGetter) GetRow(_ context.Context, _ *header.ExtendedHeader, _ int) (shwap.Row, error) { + panic("not implemented") +} + func (g successGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { panic("not implemented") } diff --git a/share/shwap/getter.go b/share/shwap/getter.go index 9e0a5d3131..f56c8edc63 100644 --- a/share/shwap/getter.go +++ b/share/shwap/getter.go @@ -39,6 +39,8 @@ type Getter interface { // GetEDS gets the full EDS identified by the given extended header. GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) + // GetRow gets Row by its index committed to the given extended header. + GetRow(ctx context.Context, header *header.ExtendedHeader, rowIdx int) (Row, error) // GetNamespaceData gets all shares from an EDS within the given namespace. // Shares are returned in a row-by-row order if the namespace spans multiple rows. // Inclusion of returned data could be verified using Verify method on NamespacedShares. diff --git a/share/shwap/getters/cascade.go b/share/shwap/getters/cascade.go index 39ceb2fdb1..c9ac091ddd 100644 --- a/share/shwap/getters/cascade.go +++ b/share/shwap/getters/cascade.go @@ -71,6 +71,18 @@ func (cg *CascadeGetter) GetEDS( return cascadeGetters(ctx, cg.getters, get) } +// GetRow gets row shares from any of registered shwap.Getters in cascading +// order. +func (cg *CascadeGetter) GetRow(ctx context.Context, header *header.ExtendedHeader, rowIdx int) (shwap.Row, error) { + ctx, span := tracer.Start(ctx, "cascade/get-row") + defer span.End() + + get := func(ctx context.Context, get shwap.Getter) (shwap.Row, error) { + return get.GetRow(ctx, header, rowIdx) + } + return cascadeGetters(ctx, cg.getters, get) +} + // GetNamespaceData gets NamespacedShares from any of registered shwap.Getters in cascading // order. func (cg *CascadeGetter) GetNamespaceData( diff --git a/share/shwap/getters/mock/getter.go b/share/shwap/getters/mock/getter.go index 7e4dacb24a..ade35b39f8 100644 --- a/share/shwap/getters/mock/getter.go +++ b/share/shwap/getters/mock/getter.go @@ -68,6 +68,21 @@ func (mr *MockGetterMockRecorder) GetNamespaceData(arg0, arg1, arg2 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNamespaceData", reflect.TypeOf((*MockGetter)(nil).GetNamespaceData), arg0, arg1, arg2) } +// GetRow mocks base method. +func (m *MockGetter) GetRow(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 int) (shwap.Row, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRow", arg0, arg1, arg2) + ret0, _ := ret[0].(shwap.Row) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetRow indicates an expected call of GetRow. +func (mr *MockGetterMockRecorder) GetRow(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRow", reflect.TypeOf((*MockGetter)(nil).GetRow), arg0, arg1, arg2) +} + // GetSamples mocks base method. func (m *MockGetter) GetSamples(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 []shwap.SampleCoords) ([]shwap.Sample, error) { m.ctrl.T.Helper() diff --git a/share/shwap/getters/testing.go b/share/shwap/getters/testing.go index a3ee53753d..c244204aba 100644 --- a/share/shwap/getters/testing.go +++ b/share/shwap/getters/testing.go @@ -58,6 +58,23 @@ func (seg *SingleEDSGetter) GetSamples(ctx context.Context, hdr *header.Extended return smpls, nil } +func (seg *SingleEDSGetter) GetRow( + ctx context.Context, + header *header.ExtendedHeader, + rowIdx int, +) (shwap.Row, error) { + err := seg.checkRoots(header.DAH) + if err != nil { + return shwap.Row{}, err + } + + axisHalf, err := seg.EDS.AxisHalf(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return shwap.Row{}, err + } + return axisHalf.ToRow(), nil +} + // GetEDS returns a kept EDS if the correct root is given. func (seg *SingleEDSGetter) GetEDS( _ context.Context, diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index db2938663c..f008809833 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -134,6 +134,32 @@ func (g *Getter) GetSamples( return smpls, nil } +func (g *Getter) GetRow(ctx context.Context, hdr *header.ExtendedHeader, rowIdx int) (shwap.Row, error) { + ctx, span := tracer.Start(ctx, "get-eds") + defer span.End() + + blk, err := NewEmptyRowBlock(hdr.Height(), rowIdx, len(hdr.DAH.RowRoots)) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "NewEmptyRowBlock") + return shwap.Row{}, err + } + + isArchival := g.isArchival(hdr) + span.SetAttributes(attribute.Bool("is_archival", isArchival)) + + ses, release := g.getSession(isArchival) + defer release() + + err = Fetch(ctx, g.exchange, hdr.DAH, []Block{blk}, WithFetcher(ses)) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "Fetch") + return shwap.Row{}, err + } + return blk.Container, nil +} + // GetEDS uses [RowBlock] and [Fetch] to get half of the first EDS quadrant(ODS) and // recomputes the whole EDS from it. // We fetch the ODS or Q1 to ensure better compatibility with archival nodes that only diff --git a/share/shwap/p2p/shrex/shrex_getter/shrex.go b/share/shwap/p2p/shrex/shrex_getter/shrex.go index 6c91a44736..142b3f3ab2 100644 --- a/share/shwap/p2p/shrex/shrex_getter/shrex.go +++ b/share/shwap/p2p/shrex/shrex_getter/shrex.go @@ -150,6 +150,10 @@ func (sg *Getter) GetSamples(context.Context, *header.ExtendedHeader, []shwap.Sa return nil, fmt.Errorf("getter/shrex: GetShare %w", shwap.ErrOperationNotSupported) } +func (sg *Getter) GetRow(_ context.Context, _ *header.ExtendedHeader, _ int) (shwap.Row, error) { + return shwap.Row{}, fmt.Errorf("getter/shrex: GetRow %w", shwap.ErrOperationNotSupported) +} + func (sg *Getter) GetEDS(ctx context.Context, header *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { var err error ctx, span := tracer.Start(ctx, "shrex/get-eds") diff --git a/share/shwap/row.go b/share/shwap/row.go index 7057680573..87faff4875 100644 --- a/share/shwap/row.go +++ b/share/shwap/row.go @@ -2,6 +2,7 @@ package shwap import ( "bytes" + "encoding/json" "fmt" "github.com/celestiaorg/celestia-app/v3/pkg/wrapper" @@ -171,6 +172,33 @@ func (r *Row) verifyInclusion(roots *share.AxisRoots, idx int) error { return nil } +// MarshalJSON encodes row to the json encoded bytes. +func (r Row) MarshalJSON() ([]byte, error) { + jsonRow := struct { + Shares []libshare.Share `json:"shares"` + Side string `json:"side"` + }{ + Shares: r.shares, + Side: r.side.String(), + } + return json.Marshal(&jsonRow) +} + +// UnmarshalJSON decodes json bytes to the row. +func (r *Row) UnmarshalJSON(data []byte) error { + jsonRow := struct { + Shares []libshare.Share `json:"shares"` + Side string `json:"side"` + }{} + err := json.Unmarshal(data, &jsonRow) + if err != nil { + return err + } + r.shares = jsonRow.Shares + r.side = toRowSide(jsonRow.Side) + return nil +} + // ToProto converts a RowSide to its protobuf representation. func (s RowSide) ToProto() pb.Row_HalfSide { if s == Left { @@ -186,3 +214,29 @@ func sideFromProto(side pb.Row_HalfSide) RowSide { } return Right } + +func (s RowSide) String() string { + switch s { + case Left: + return "LEFT" + case Right: + return "RIGHT" + case Both: + return "BOTH" + default: + panic("invalid row side") + } +} + +func toRowSide(s string) RowSide { + switch s { + case "LEFT": + return Left + case "RIGHT": + return Right + case "BOTH": + return Both + default: + panic("invalid row side") + } +} diff --git a/share/shwap/row_test.go b/share/shwap/row_test.go index 16bce3893b..3cf80befca 100644 --- a/share/shwap/row_test.go +++ b/share/shwap/row_test.go @@ -1,6 +1,7 @@ package shwap import ( + "encoding/json" "testing" "github.com/stretchr/testify/require" @@ -29,6 +30,33 @@ func TestRowShares(t *testing.T) { } } +func TestRowMarshal(t *testing.T) { + const odsSize = 8 + eds := edstest.RandEDS(t, odsSize) + for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) + require.NoError(t, err) + rowData, err := json.Marshal(row) + require.NoError(t, err) + + decodedRow := &Row{} + err = json.Unmarshal(rowData, decodedRow) + require.NoError(t, err) + + require.Equal(t, side, decodedRow.side) + extended, err := decodedRow.Shares() + require.NoError(t, err) + + shares, err := row.Shares() + require.NoError(t, err) + + require.Equal(t, shares, extended) + require.Equal(t, row.side, decodedRow.side) + } + } +} + func TestRowValidate(t *testing.T) { const odsSize = 8 eds := edstest.RandEDS(t, odsSize) diff --git a/store/getter.go b/store/getter.go index 1315561730..7a94c408ac 100644 --- a/store/getter.go +++ b/store/getter.go @@ -71,6 +71,21 @@ func (g *Getter) GetEDS(ctx context.Context, h *header.ExtendedHeader) (*rsmt2d. return rsmt2d.ExtendedDataSquare, nil } +func (g *Getter) GetRow(ctx context.Context, h *header.ExtendedHeader, rowIdx int) (shwap.Row, error) { + acc, err := g.store.GetByHeight(ctx, h.Height()) + if err != nil { + if errors.Is(err, ErrNotFound) { + return shwap.Row{}, shwap.ErrNotFound + } + return shwap.Row{}, fmt.Errorf("getting accessor from store: %w", err) + } + axisHalf, err := acc.AxisHalf(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return shwap.Row{}, fmt.Errorf("getting axis half from accessor: %w", err) + } + return axisHalf.ToRow(), nil +} + func (g *Getter) GetNamespaceData( ctx context.Context, h *header.ExtendedHeader, diff --git a/store/getter_test.go b/store/getter_test.go index b0b027fc19..0042f43ecf 100644 --- a/store/getter_test.go +++ b/store/getter_test.go @@ -69,6 +69,26 @@ func TestStoreGetter(t *testing.T) { require.ErrorIs(t, err, shwap.ErrNotFound) }) + t.Run("GetRow", func(t *testing.T) { + eds, roots := randomEDS(t) + eh := headertest.RandExtendedHeaderWithRoot(t, roots) + height := height.Add(1) + eh.RawHeader.Height = int64(height) + + err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds) + require.NoError(t, err) + + for i := 0; i < len(eh.DAH.RowRoots); i++ { + row, err := sg.GetRow(ctx, eh, i) + require.NoError(t, err) + retreivedShrs, err := row.Shares() + require.NoError(t, err) + edsShrs, err := libshare.FromBytes(eds.Row(uint(i))) + require.NoError(t, err) + require.Equal(t, edsShrs, retreivedShrs) + } + }) + t.Run("GetNamespaceData", func(t *testing.T) { ns := libshare.RandomNamespace() eds, roots := edstest.RandEDSWithNamespace(t, ns, 8, 16) From 66b94a84fa22dc681dcbae082e70a87d8b989127 Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Mon, 13 Jan 2025 11:04:02 +0100 Subject: [PATCH 10/11] Revert "fix(p2p): disable quic (#3937)" (#4039) This reverts commit e6dbb54dd28462c2820ae455295acc856af92b64 as https://github.com/quic-go/quic-go/issues/4712 is solved and fixed. It's not yet released, but the bug is not critical to block on it. --- nodebuilder/p2p/addrs.go | 17 +++-------------- nodebuilder/p2p/host.go | 24 +++++++----------------- 2 files changed, 10 insertions(+), 31 deletions(-) diff --git a/nodebuilder/p2p/addrs.go b/nodebuilder/p2p/addrs.go index 607d63c2bd..1a928e3be2 100644 --- a/nodebuilder/p2p/addrs.go +++ b/nodebuilder/p2p/addrs.go @@ -2,7 +2,6 @@ package p2p import ( "fmt" - "slices" p2pconfig "github.com/libp2p/go-libp2p/config" hst "github.com/libp2p/go-libp2p/core/host" @@ -12,22 +11,12 @@ import ( // Listen returns invoke function that starts listening for inbound connections with libp2p.Host. func Listen(cfg *Config) func(h hst.Host) (err error) { return func(h hst.Host) (err error) { - maListen := make([]ma.Multiaddr, 0, len(cfg.ListenAddresses)) - for _, addr := range cfg.ListenAddresses { - maddr, err := ma.NewMultiaddr(addr) + maListen := make([]ma.Multiaddr, len(cfg.ListenAddresses)) + for i, addr := range cfg.ListenAddresses { + maListen[i], err = ma.NewMultiaddr(addr) if err != nil { return fmt.Errorf("failure to parse config.P2P.ListenAddresses: %w", err) } - if !enableQUIC { - // TODO(@walldiss): Remove this check when QUIC is stable - if slices.ContainsFunc(maddr.Protocols(), func(p ma.Protocol) bool { - return p.Code == ma.P_QUIC_V1 || p.Code == ma.P_WEBTRANSPORT - }) { - continue - } - } - - maListen = append(maListen, maddr) } return h.Network().Listen(maListen...) } diff --git a/nodebuilder/p2p/host.go b/nodebuilder/p2p/host.go index c38713f6a3..1003970199 100644 --- a/nodebuilder/p2p/host.go +++ b/nodebuilder/p2p/host.go @@ -3,7 +3,6 @@ package p2p import ( "context" "fmt" - "os" "strings" "github.com/libp2p/go-libp2p" @@ -28,8 +27,6 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/node" ) -var enableQUIC = os.Getenv("CELESTIA_ENABLE_QUIC") == "1" - // routedHost constructs a wrapped Host that may fallback to address discovery, // if any top-level operation on the Host is provided with PeerID(Hash(PbK)) only. func routedHost(base HostBase, r routing.PeerRouting) hst.Host { @@ -83,19 +80,6 @@ func host(params hostParams) (HostBase, error) { params.Cfg.Upgrade() } - transports := []libp2p.Option{ - libp2p.Transport(tcp.NewTCPTransport), - libp2p.Transport(libp2pwebrtc.New), - wsTransport(tlsCfg), - } - - // disable quic and webtransport client support until it is stable - if enableQUIC { - transports = append(transports, - libp2p.Transport(quic.NewTransport), - libp2p.Transport(webtransport.New)) - } - opts := []libp2p.Option{ libp2p.NoListenAddrs, // do not listen automatically libp2p.AddrsFactory(params.AddrF), @@ -108,7 +92,13 @@ func host(params hostParams) (HostBase, error) { libp2p.DisableRelay(), libp2p.BandwidthReporter(params.Bandwidth), libp2p.ResourceManager(params.ResourceManager), - libp2p.ChainOptions(transports...), + libp2p.ChainOptions( + libp2p.Transport(tcp.NewTCPTransport), + libp2p.Transport(quic.NewTransport), + libp2p.Transport(webtransport.New), + libp2p.Transport(libp2pwebrtc.New), + wsTransport(tlsCfg), + ), // to clearly define what defaults we rely upon libp2p.DefaultSecurity, libp2p.DefaultMuxers, From c599285bba4d46cc945d094bcf1a22644bf34bae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=8B=E3=81=92?= <47621124+ronething-bot@users.noreply.github.com> Date: Wed, 15 Jan 2025 16:42:21 +0800 Subject: [PATCH 11/11] chore(pruner): remove SpacedHeaderGenerator (#4032) --- header/headertest/testing.go | 19 ++++++++++++++++--- pruner/service_test.go | 33 ++------------------------------- 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/header/headertest/testing.go b/header/headertest/testing.go index 251592a9a5..2e25592f27 100644 --- a/header/headertest/testing.go +++ b/header/headertest/testing.go @@ -15,7 +15,6 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/proto/tendermint/version" "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" "github.com/celestiaorg/celestia-app/v3/pkg/da" libhead "github.com/celestiaorg/go-header" @@ -40,6 +39,7 @@ type TestSuite struct { // blockTime is optional - if set, the test suite will generate // blocks timestamped at the specified interval blockTime time.Duration + startTime time.Time } func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] { @@ -62,6 +62,18 @@ func NewTestSuite(t *testing.T, numValidators int, blockTime time.Duration) *Tes vals: vals, valSet: valSet, blockTime: blockTime, + startTime: time.Now(), + } +} + +func NewTestSuiteWithGenesisTime(t *testing.T, startTime time.Time, blockTime time.Duration) *TestSuite { + valSet, vals := RandValidatorSet(3, 1) + return &TestSuite{ + t: t, + vals: vals, + valSet: valSet, + blockTime: blockTime, + startTime: startTime, } } @@ -74,10 +86,11 @@ func (s *TestSuite) genesis() *header.ExtendedHeader { gen.ValidatorsHash = s.valSet.Hash() gen.NextValidatorsHash = s.valSet.Hash() gen.Height = 1 + gen.Time = s.startTime voteSet := types.NewVoteSet(gen.ChainID, gen.Height, 0, tmproto.PrecommitType, s.valSet) blockID := RandBlockID(s.t) blockID.Hash = gen.Hash() - commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, time.Now()) + commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, s.startTime) require.NoError(s.t, err) eh := &header.ExtendedHeader{ @@ -199,7 +212,7 @@ func (s *TestSuite) Commit(h *header.RawHeader) *types.Commit { ValidatorIndex: int32(i), Height: h.Height, Round: round, - Timestamp: tmtime.Now().UTC(), + Timestamp: h.Time, Type: tmproto.PrecommitType, BlockID: bid, } diff --git a/pruner/service_test.go b/pruner/service_test.go index c7ebeac03a..75e0e0198e 100644 --- a/pruner/service_test.go +++ b/pruner/service_test.go @@ -248,8 +248,8 @@ func TestFindPruneableHeaders(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime) - store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount) + suite := headertest.NewTestSuiteWithGenesisTime(t, tc.startTime, tc.blockTime) + store := headertest.NewCustomStore(t, suite, tc.headerAmount) mp := &mockPruner{} @@ -317,32 +317,3 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error { mp.deletedHeaderHashes = append(mp.deletedHeaderHashes, pruned{hash: h.Hash().String(), height: h.Height()}) return nil } - -// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility. -// https://github.com/celestiaorg/celestia-node/issues/3278. -type SpacedHeaderGenerator struct { - t *testing.T - TimeBetweenHeaders time.Duration - currentTime time.Time - currentHeight int64 -} - -func NewSpacedHeaderGenerator( - t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration, -) *SpacedHeaderGenerator { - return &SpacedHeaderGenerator{ - t: t, - TimeBetweenHeaders: timeBetweenHeaders, - currentTime: startTime, - currentHeight: 1, - } -} - -func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader { - h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime) - h.RawHeader.Height = shg.currentHeight - h.RawHeader.Time = shg.currentTime - shg.currentHeight++ - shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders) - return h -}