Skip to content

Commit

Permalink
Merge branch 'main' into blackbox-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
derrandz committed Feb 17, 2023
2 parents c10a17b + dd44463 commit e0ee3b9
Show file tree
Hide file tree
Showing 79 changed files with 2,539 additions and 546 deletions.
26 changes: 26 additions & 0 deletions core/eds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package core

import (
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/pkg/da"
appshares "github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/libs/utils"
)

// extendBlock extends the given block data, returning the resulting
// ExtendedDataSquare (EDS). If there are no transactions in the block,
// nil is returned in place of the eds.
func extendBlock(data types.Data) (*rsmt2d.ExtendedDataSquare, error) {
if len(data.Txs) == 0 {
return nil, nil
}
shares, err := appshares.Split(data, true)
if err != nil {
return nil, err
}
size := utils.SquareSize(len(shares))
return da.ExtendShares(size, appshares.ToBytes(shares))
}
52 changes: 40 additions & 12 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@ import (
"context"
"fmt"

"github.com/ipfs/go-blockservice"

"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
"github.com/celestiaorg/celestia-node/share/eds"
)

type Exchange struct {
fetcher *BlockFetcher
shareStore blockservice.BlockService
construct header.ConstructFn
fetcher *BlockFetcher
store *eds.Store
construct header.ConstructFn
}

func NewExchange(
fetcher *BlockFetcher,
bServ blockservice.BlockService,
store *eds.Store,
construct header.ConstructFn,
) *Exchange {
return &Exchange{
fetcher: fetcher,
shareStore: bServ,
construct: construct,
fetcher: fetcher,
store: store,
construct: construct,
}
}

Expand Down Expand Up @@ -86,15 +85,27 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, err
}

eh, err := ce.construct(ctx, block, comm, vals, ce.shareStore)
// extend block data
eds, err := extendBlock(block.Data)
if err != nil {
return nil, err
}
// construct extended header
eh, err := ce.construct(ctx, block, comm, vals, eds)
if err != nil {
return nil, err
}

// verify hashes match
if !bytes.Equal(hash, eh.Hash()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, eh.Hash())
}
// store extended block if it is not empty
if eds != nil {
err = ce.store.Put(ctx, eh.DAH.Hash(), eds)
if err != nil {
return nil, err
}
}

return eh, nil
}
Expand All @@ -115,5 +126,22 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
return nil, err
}

return ce.construct(ctx, b, comm, vals, ce.shareStore)
// extend block data
eds, err := extendBlock(b.Data)
if err != nil {
return nil, err
}
// create extended header
eh, err := ce.construct(ctx, b, comm, vals, eds)
if err != nil {
return nil, err
}
// only store extended block if it's not empty
if eds != nil {
err = ce.store.Put(ctx, eh.DAH.Hash(), eds)
if err != nil {
return nil, err
}
}
return eh, nil
}
34 changes: 22 additions & 12 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,51 @@
package core

import (
"bytes"
"context"
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/testutil/testnode"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/eds"
)

func TestCoreExchange_RequestHeaders(t *testing.T) {
fetcher := createCoreFetcher(t)
store := mdutils.Bserv()
fetcher, _ := createCoreFetcher(t, DefaultTestConfig())

// generate 10 blocks
generateBlocks(t, fetcher)

store := createStore(t)

ce := NewExchange(fetcher, store, header.MakeExtendedHeader)
headers, err := ce.GetRangeByHeight(context.Background(), 1, 10)
require.NoError(t, err)

assert.Equal(t, 10, len(headers))
}

func Test_hashMatch(t *testing.T) {
expected := []byte("AE0F153556A4FA5C0B7C3BFE0BAF0EC780C031933B281A8D759BB34C1DA31C56")
mismatch := []byte("57A0D7FE69FE88B3D277C824B3ACB9B60E5E65837A802485DE5CBB278C43576A")

assert.False(t, bytes.Equal(expected, mismatch))
func createCoreFetcher(t *testing.T, cfg *TestConfig) (*BlockFetcher, testnode.Context) {
cctx := StartTestNodeWithConfig(t, cfg)
// wait for height 2 in order to be able to start submitting txs (this prevents
// flakiness with accessing account state)
_, err := cctx.WaitForHeightWithTimeout(2, time.Second) // TODO @renaynay: configure?
require.NoError(t, err)
return NewBlockFetcher(cctx.Client), cctx
}

func createCoreFetcher(t *testing.T) *BlockFetcher {
client := StartTestNode(t).Client
return NewBlockFetcher(client)
func createStore(t *testing.T) *eds.Store {
t.Helper()

store, err := eds.NewStore(t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
return store
}

func generateBlocks(t *testing.T, fetcher *BlockFetcher) {
Expand Down
18 changes: 9 additions & 9 deletions headertest/header_test.go → core/header_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
package headertest
package core

import (
"context"
"testing"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/rand"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/headertest"
)

func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

client := core.StartTestNode(t).Client
fetcher := core.NewBlockFetcher(client)

store := mdutils.Bserv()
client := StartTestNode(t).Client
fetcher := NewBlockFetcher(client)

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
Expand All @@ -33,14 +30,17 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {
comm, val, err := fetcher.GetBlockInfo(ctx, &height)
require.NoError(t, err)

headerExt, err := header.MakeExtendedHeader(ctx, b, comm, val, store)
eds, err := extendBlock(b.Data)
require.NoError(t, err)

headerExt, err := header.MakeExtendedHeader(ctx, b, comm, val, eds)
require.NoError(t, err)

assert.Equal(t, header.EmptyDAH(), *headerExt.DAH)
}

func TestMismatchedDataHash_ComputedRoot(t *testing.T) {
header := RandExtendedHeader(t)
header := headertest.RandExtendedHeader(t)

header.DataHash = rand.Bytes(32)

Expand Down
45 changes: 30 additions & 15 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"fmt"

"github.com/ipfs/go-blockservice"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

Expand All @@ -22,9 +22,9 @@ import (
// network.
type Listener struct {
fetcher *BlockFetcher
bServ blockservice.BlockService

construct header.ConstructFn
store *eds.Store

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand All @@ -35,16 +35,16 @@ type Listener struct {
func NewListener(
bcast libhead.Broadcaster[*header.ExtendedHeader],
fetcher *BlockFetcher,
extHeaderBroadcaster shrexsub.BroadcastFn,
bServ blockservice.BlockService,
hashBroadcaster shrexsub.BroadcastFn,
construct header.ConstructFn,
store *eds.Store,
) *Listener {
return &Listener{
headerBroadcaster: bcast,
fetcher: fetcher,
hashBroadcaster: extHeaderBroadcaster,
bServ: bServ,
headerBroadcaster: bcast,
hashBroadcaster: hashBroadcaster,
construct: construct,
store: store,
}
}

Expand All @@ -65,7 +65,7 @@ func (cl *Listener) Start(ctx context.Context) error {
return nil
}

// Stop stops the Listener listener loop.
// Stop stops the listener loop.
func (cl *Listener) Stop(ctx context.Context) error {
cl.cancel()
cl.cancel = nil
Expand Down Expand Up @@ -96,17 +96,25 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

eh, err := cl.construct(ctx, b, comm, vals, cl.bServ)
// extend block data
eds, err := extendBlock(b.Data)
if err != nil {
log.Errorw("listener: making extended header", "err", err)
log.Errorw("listener: extending block data", "err", err)
return
}

// broadcast new ExtendedHeader, but if core is still syncing, notify only local subscribers
err = cl.headerBroadcaster.Broadcast(ctx, eh, pubsub.WithLocalPublication(syncing))
// generate extended header
eh, err := cl.construct(ctx, b, comm, vals, eds)
if err != nil {
log.Errorw("listener: broadcasting next header", "height", eh.Height(),
"err", err)
log.Errorw("listener: making extended header", "err", err)
return
}
// store block data if not empty
if eds != nil {
err = cl.store.Put(ctx, eh.DAH.Hash(), eds)
if err != nil {
log.Errorw("listener: storing extended header", "err", err)
return
}
}

// notify network of new EDS hash only if core is already synced
Expand All @@ -117,6 +125,13 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan *types.Block) {
"hash", eh.Hash(), "err", err)
}
}

// broadcast new ExtendedHeader, but if core is still syncing, notify only local subscribers
err = cl.headerBroadcaster.Broadcast(ctx, eh, pubsub.WithLocalPublication(syncing))
if err != nil {
log.Errorw("listener: broadcasting next header", "height", eh.Height(),
"err", err)
}
case <-ctx.Done():
return
}
Expand Down
Loading

0 comments on commit e0ee3b9

Please sign in to comment.