Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg authored Jan 15, 2025
2 parents a232746 + c599285 commit 37a4db3
Show file tree
Hide file tree
Showing 18 changed files with 260 additions and 68 deletions.
19 changes: 16 additions & 3 deletions header/headertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/proto/tendermint/version"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"

"github.com/celestiaorg/celestia-app/v3/pkg/da"
libhead "github.com/celestiaorg/go-header"
Expand All @@ -40,6 +39,7 @@ type TestSuite struct {
// blockTime is optional - if set, the test suite will generate
// blocks timestamped at the specified interval
blockTime time.Duration
startTime time.Time
}

func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] {
Expand All @@ -62,6 +62,18 @@ func NewTestSuite(t *testing.T, numValidators int, blockTime time.Duration) *Tes
vals: vals,
valSet: valSet,
blockTime: blockTime,
startTime: time.Now(),
}
}

func NewTestSuiteWithGenesisTime(t *testing.T, startTime time.Time, blockTime time.Duration) *TestSuite {
valSet, vals := RandValidatorSet(3, 1)
return &TestSuite{
t: t,
vals: vals,
valSet: valSet,
blockTime: blockTime,
startTime: startTime,
}
}

Expand All @@ -74,10 +86,11 @@ func (s *TestSuite) genesis() *header.ExtendedHeader {
gen.ValidatorsHash = s.valSet.Hash()
gen.NextValidatorsHash = s.valSet.Hash()
gen.Height = 1
gen.Time = s.startTime
voteSet := types.NewVoteSet(gen.ChainID, gen.Height, 0, tmproto.PrecommitType, s.valSet)
blockID := RandBlockID(s.t)
blockID.Hash = gen.Hash()
commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, time.Now())
commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, s.startTime)
require.NoError(s.t, err)

eh := &header.ExtendedHeader{
Expand Down Expand Up @@ -199,7 +212,7 @@ func (s *TestSuite) Commit(h *header.RawHeader) *types.Commit {
ValidatorIndex: int32(i),
Height: h.Height,
Round: round,
Timestamp: tmtime.Now().UTC(),
Timestamp: h.Time,
Type: tmproto.PrecommitType,
BlockID: bid,
}
Expand Down
2 changes: 0 additions & 2 deletions nodebuilder/da/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"strings"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -256,7 +255,6 @@ func (s *Service) Validate(
// invalid proof") but analysis of the code in celestia-node implies this should never happen -
// maybe it's caused by openrpc? there is no way of gently handling errors here, but returned
// value is fine for us
fmt.Println("proof", proofs[i] == nil, "commitment", commitment == nil)
isIncluded, _ := s.blobServ.Included(ctx, height, ns, proofs[i], commitment)
included[i] = isIncluded
}
Expand Down
17 changes: 3 additions & 14 deletions nodebuilder/p2p/addrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"fmt"
"slices"

p2pconfig "github.com/libp2p/go-libp2p/config"
hst "github.com/libp2p/go-libp2p/core/host"
Expand All @@ -12,22 +11,12 @@ import (
// Listen returns invoke function that starts listening for inbound connections with libp2p.Host.
func Listen(cfg *Config) func(h hst.Host) (err error) {
return func(h hst.Host) (err error) {
maListen := make([]ma.Multiaddr, 0, len(cfg.ListenAddresses))
for _, addr := range cfg.ListenAddresses {
maddr, err := ma.NewMultiaddr(addr)
maListen := make([]ma.Multiaddr, len(cfg.ListenAddresses))
for i, addr := range cfg.ListenAddresses {
maListen[i], err = ma.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("failure to parse config.P2P.ListenAddresses: %w", err)
}
if !enableQUIC {
// TODO(@walldiss): Remove this check when QUIC is stable
if slices.ContainsFunc(maddr.Protocols(), func(p ma.Protocol) bool {
return p.Code == ma.P_QUIC_V1 || p.Code == ma.P_WEBTRANSPORT
}) {
continue
}
}

maListen = append(maListen, maddr)
}
return h.Network().Listen(maListen...)
}
Expand Down
24 changes: 7 additions & 17 deletions nodebuilder/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package p2p
import (
"context"
"fmt"
"os"
"strings"

"github.com/libp2p/go-libp2p"
Expand All @@ -28,8 +27,6 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

var enableQUIC = os.Getenv("CELESTIA_ENABLE_QUIC") == "1"

// routedHost constructs a wrapped Host that may fallback to address discovery,
// if any top-level operation on the Host is provided with PeerID(Hash(PbK)) only.
func routedHost(base HostBase, r routing.PeerRouting) hst.Host {
Expand Down Expand Up @@ -83,19 +80,6 @@ func host(params hostParams) (HostBase, error) {
params.Cfg.Upgrade()
}

transports := []libp2p.Option{
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(libp2pwebrtc.New),
wsTransport(tlsCfg),
}

// disable quic and webtransport client support until it is stable
if enableQUIC {
transports = append(transports,
libp2p.Transport(quic.NewTransport),
libp2p.Transport(webtransport.New))
}

opts := []libp2p.Option{
libp2p.NoListenAddrs, // do not listen automatically
libp2p.AddrsFactory(params.AddrF),
Expand All @@ -108,7 +92,13 @@ func host(params hostParams) (HostBase, error) {
libp2p.DisableRelay(),
libp2p.BandwidthReporter(params.Bandwidth),
libp2p.ResourceManager(params.ResourceManager),
libp2p.ChainOptions(transports...),
libp2p.ChainOptions(
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport),
libp2p.Transport(webtransport.New),
libp2p.Transport(libp2pwebrtc.New),
wsTransport(tlsCfg),
),
// to clearly define what defaults we rely upon
libp2p.DefaultSecurity,
libp2p.DefaultMuxers,
Expand Down
15 changes: 15 additions & 0 deletions nodebuilder/share/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions nodebuilder/share/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Module interface {
GetSamples(ctx context.Context, header *header.ExtendedHeader, indices []shwap.SampleCoords) ([]shwap.Sample, error)
// GetEDS gets the full EDS identified by the given extended header.
GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedDataSquare, error)
// GetRow gets all shares from specified row.
GetRow(context.Context, uint64, int) (shwap.Row, error)
// GetNamespaceData gets all shares from an EDS within the given namespace.
// Shares are returned in a row-by-row order if the namespace spans multiple rows.
GetNamespaceData(
Expand Down Expand Up @@ -77,6 +79,11 @@ type API struct {
ctx context.Context,
height uint64,
) (*rsmt2d.ExtendedDataSquare, error) `perm:"read"`
GetRow func(
context.Context,
uint64,
int,
) (shwap.Row, error) `perm:"read"`
GetNamespaceData func(
ctx context.Context,
height uint64,
Expand Down Expand Up @@ -108,6 +115,10 @@ func (api *API) GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedData
return api.Internal.GetEDS(ctx, height)
}

func (api *API) GetRow(ctx context.Context, height uint64, rowIdx int) (shwap.Row, error) {
return api.Internal.GetRow(ctx, height, rowIdx)
}

func (api *API) GetRange(ctx context.Context, height uint64, start, end int) (*GetRangeResult, error) {
return api.Internal.GetRange(ctx, height, start, end)
}
Expand Down Expand Up @@ -196,3 +207,11 @@ func (m module) GetNamespaceData(
}
return m.getter.GetNamespaceData(ctx, header, namespace)
}

func (m module) GetRow(ctx context.Context, height uint64, rowIdx int) (shwap.Row, error) {
header, err := m.hs.GetByHeight(ctx, height)
if err != nil {
return shwap.Row{}, err
}
return m.getter.GetRow(ctx, header, rowIdx)
}
33 changes: 2 additions & 31 deletions pruner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ func TestFindPruneableHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime)
store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount)
suite := headertest.NewTestSuiteWithGenesisTime(t, tc.startTime, tc.blockTime)
store := headertest.NewCustomStore(t, suite, tc.headerAmount)

mp := &mockPruner{}

Expand Down Expand Up @@ -317,32 +317,3 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error {
mp.deletedHeaderHashes = append(mp.deletedHeaderHashes, pruned{hash: h.Hash().String(), height: h.Height()})
return nil
}

// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility.
// https://github.com/celestiaorg/celestia-node/issues/3278.
type SpacedHeaderGenerator struct {
t *testing.T
TimeBetweenHeaders time.Duration
currentTime time.Time
currentHeight int64
}

func NewSpacedHeaderGenerator(
t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration,
) *SpacedHeaderGenerator {
return &SpacedHeaderGenerator{
t: t,
TimeBetweenHeaders: timeBetweenHeaders,
currentTime: startTime,
currentHeight: 1,
}
}

func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader {
h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime)
h.RawHeader.Height = shg.currentHeight
h.RawHeader.Time = shg.currentTime
shg.currentHeight++
shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders)
return h
}
6 changes: 5 additions & 1 deletion share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (g successGetter) checkOnce(t *testing.T) {
}
}

func (g successGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader,
func (g successGetter) GetSamples(_ context.Context, _ *header.ExtendedHeader,
indices []shwap.SampleCoords,
) ([]shwap.Sample, error) {
g.Lock()
Expand All @@ -305,6 +305,10 @@ func (g successGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader,
return smpls, nil
}

func (g successGetter) GetRow(_ context.Context, _ *header.ExtendedHeader, _ int) (shwap.Row, error) {
panic("not implemented")
}

func (g successGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) {
panic("not implemented")
}
Expand Down
2 changes: 2 additions & 0 deletions share/shwap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Getter interface {
// GetEDS gets the full EDS identified by the given extended header.
GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error)

// GetRow gets Row by its index committed to the given extended header.
GetRow(ctx context.Context, header *header.ExtendedHeader, rowIdx int) (Row, error)
// GetNamespaceData gets all shares from an EDS within the given namespace.
// Shares are returned in a row-by-row order if the namespace spans multiple rows.
// Inclusion of returned data could be verified using Verify method on NamespacedShares.
Expand Down
12 changes: 12 additions & 0 deletions share/shwap/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ func (cg *CascadeGetter) GetEDS(
return cascadeGetters(ctx, cg.getters, get)
}

// GetRow gets row shares from any of registered shwap.Getters in cascading
// order.
func (cg *CascadeGetter) GetRow(ctx context.Context, header *header.ExtendedHeader, rowIdx int) (shwap.Row, error) {
ctx, span := tracer.Start(ctx, "cascade/get-row")
defer span.End()

get := func(ctx context.Context, get shwap.Getter) (shwap.Row, error) {
return get.GetRow(ctx, header, rowIdx)
}
return cascadeGetters(ctx, cg.getters, get)
}

// GetNamespaceData gets NamespacedShares from any of registered shwap.Getters in cascading
// order.
func (cg *CascadeGetter) GetNamespaceData(
Expand Down
15 changes: 15 additions & 0 deletions share/shwap/getters/mock/getter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions share/shwap/getters/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ func (seg *SingleEDSGetter) GetSamples(ctx context.Context, hdr *header.Extended
return smpls, nil
}

func (seg *SingleEDSGetter) GetRow(
ctx context.Context,
header *header.ExtendedHeader,
rowIdx int,
) (shwap.Row, error) {
err := seg.checkRoots(header.DAH)
if err != nil {
return shwap.Row{}, err
}

axisHalf, err := seg.EDS.AxisHalf(ctx, rsmt2d.Row, rowIdx)
if err != nil {
return shwap.Row{}, err
}
return axisHalf.ToRow(), nil
}

// GetEDS returns a kept EDS if the correct root is given.
func (seg *SingleEDSGetter) GetEDS(
_ context.Context,
Expand Down
26 changes: 26 additions & 0 deletions share/shwap/p2p/bitswap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,32 @@ func (g *Getter) GetSamples(
return smpls, nil
}

func (g *Getter) GetRow(ctx context.Context, hdr *header.ExtendedHeader, rowIdx int) (shwap.Row, error) {
ctx, span := tracer.Start(ctx, "get-eds")
defer span.End()

blk, err := NewEmptyRowBlock(hdr.Height(), rowIdx, len(hdr.DAH.RowRoots))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "NewEmptyRowBlock")
return shwap.Row{}, err
}

isArchival := g.isArchival(hdr)
span.SetAttributes(attribute.Bool("is_archival", isArchival))

ses, release := g.getSession(isArchival)
defer release()

err = Fetch(ctx, g.exchange, hdr.DAH, []Block{blk}, WithFetcher(ses))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Fetch")
return shwap.Row{}, err
}
return blk.Container, nil
}

// GetEDS uses [RowBlock] and [Fetch] to get half of the first EDS quadrant(ODS) and
// recomputes the whole EDS from it.
// We fetch the ODS or Q1 to ensure better compatibility with archival nodes that only
Expand Down
Loading

0 comments on commit 37a4db3

Please sign in to comment.