diff --git a/core/client.go b/core/client.go deleted file mode 100644 index 9636619b02..0000000000 --- a/core/client.go +++ /dev/null @@ -1,26 +0,0 @@ -package core - -import ( - "fmt" - - retryhttp "github.com/hashicorp/go-retryablehttp" - "github.com/tendermint/tendermint/rpc/client" - "github.com/tendermint/tendermint/rpc/client/http" -) - -// Client is an alias to Core Client. -type Client = client.Client - -// NewRemote creates a new Client that communicates with a remote Core endpoint over HTTP. -func NewRemote(ip, port string) (Client, error) { - httpClient := retryhttp.NewClient() - httpClient.RetryMax = 2 - // suppress logging - httpClient.Logger = nil - - return http.NewWithClient( - fmt.Sprintf("tcp://%s:%s", ip, port), - "/websocket", - httpClient.StandardClient(), - ) -} diff --git a/core/eds.go b/core/eds.go index 2e8ce7ea19..6a4fb15cbf 100644 --- a/core/eds.go +++ b/core/eds.go @@ -24,8 +24,8 @@ import ( // extendBlock extends the given block data, returning the resulting // ExtendedDataSquare (EDS). If there are no transactions in the block, // nil is returned in place of the eds. -func extendBlock(data types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) { - if app.IsEmptyBlockRef(&data, appVersion) { +func extendBlock(data *types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) { + if app.IsEmptyBlockRef(data, appVersion) { return share.EmptyEDS(), nil } diff --git a/core/eds_test.go b/core/eds_test.go index 3bed7ea94e..f3ff767860 100644 --- a/core/eds_test.go +++ b/core/eds_test.go @@ -22,7 +22,7 @@ func TestTrulyEmptySquare(t *testing.T) { SquareSize: 1, } - eds, err := extendBlock(data, appconsts.LatestVersion) + eds, err := extendBlock(&data, appconsts.LatestVersion) require.NoError(t, err) require.True(t, eds.Equals(share.EmptyEDS())) } @@ -38,7 +38,7 @@ func TestEmptySquareWithZeroTxs(t *testing.T) { Txs: []types.Tx{}, } - eds, err := extendBlock(data, appconsts.LatestVersion) + eds, err := extendBlock(&data, appconsts.LatestVersion) require.NoError(t, err) require.True(t, eds.Equals(share.EmptyEDS())) diff --git a/core/exchange.go b/core/exchange.go index 372906ff85..4343d683c4 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/tendermint/tendermint/types" "golang.org/x/sync/errgroup" libhead "github.com/celestiaorg/go-header" @@ -62,8 +61,7 @@ func NewExchange( func (ce *Exchange) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { log.Debugw("requesting header", "height", height) - intHeight := int64(height) - return ce.getExtendedHeaderByHeight(ctx, &intHeight) + return ce.getExtendedHeaderByHeight(ctx, int64(height)) } func (ce *Exchange) GetRangeByHeight( @@ -129,12 +127,12 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende return nil, fmt.Errorf("fetching block by hash %s: %w", hash.String(), err) } - comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height) + comm, vals, err := ce.fetcher.GetBlockInfo(ctx, block.Height) if err != nil { return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err) } - eds, err := extendBlock(block.Data, block.Header.Version.App) + eds, err := extendBlock(&block.Data, block.Header.Version.App) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err) } @@ -162,16 +160,13 @@ func (ce *Exchange) Head( _ ...libhead.HeadOption[*header.ExtendedHeader], ) (*header.ExtendedHeader, error) { log.Debug("requesting head") - return ce.getExtendedHeaderByHeight(ctx, nil) + return ce.getExtendedHeaderByHeight(ctx, 0) } -func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*header.ExtendedHeader, error) { +func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) { b, err := ce.fetcher.GetSignedBlock(ctx, height) if err != nil { - if height == nil { - return nil, fmt.Errorf("fetching signed block for head from core: %w", err) - } - return nil, fmt.Errorf("fetching signed block at height %d from core: %w", *height, err) + return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err) } log.Debugw("fetched signed block from core", "height", b.Header.Height) @@ -179,12 +174,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err) } - - // TODO(@Wondertan): This is a hack to deref Data, allowing GC to pick it up. - // The better footgun-less solution is to change core.ResultSignedBlock fields to be pointers instead of values. - b.Data = types.Data{} - - eh, err := ce.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds) + // create extended header + eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds) if err != nil { panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)) } diff --git a/core/exchange_test.go b/core/exchange_test.go index a2187ed7c8..54b07ec914 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -3,6 +3,7 @@ package core import ( "bytes" "context" + "net" "testing" "time" @@ -34,7 +35,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) @@ -61,6 +62,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { require.NoError(t, err) assert.True(t, has) } + require.NoError(t, fetcher.Stop(ctx)) } // TestExchange_DoNotStoreHistoric tests that the CoreExchange will not @@ -87,7 +89,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) @@ -136,7 +138,7 @@ func TestExchange_StoreHistoricIfArchival(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) @@ -166,7 +168,12 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn // flakiness with accessing account state) _, err := cctx.WaitForHeightWithTimeout(2, time.Second*2) // TODO @renaynay: configure? require.NoError(t, err) - return NewBlockFetcher(cctx.Client), cctx + host, port, err := net.SplitHostPort(cctx.GRPCClient.Target()) + require.NoError(t, err) + client := newTestClient(t, host, port) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) + return fetcher, cctx } // fillBlocks fills blocks until the context is canceled. @@ -202,7 +209,7 @@ func generateNonEmptyBlocks( sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) defer func() { - err = fetcher.UnsubscribeNewBlockEvent(ctx) + err = fetcher.Stop(ctx) require.NoError(t, err) }() diff --git a/core/fetcher.go b/core/fetcher.go index f2b160e108..6f049cc7ac 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -2,39 +2,64 @@ package core import ( "context" - "errors" "fmt" + "io" + "sync/atomic" + "time" + "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" - coretypes "github.com/tendermint/tendermint/rpc/core/types" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" + "google.golang.org/grpc" libhead "github.com/celestiaorg/go-header" ) const newBlockSubscriber = "NewBlock/Events" +type SignedBlock struct { + Header *types.Header `json:"header"` + Commit *types.Commit `json:"commit"` + Data *types.Data `json:"data"` + ValidatorSet *types.ValidatorSet `json:"validator_set"` +} + var ( log = logging.Logger("core") newDataSignedBlockQuery = types.QueryForEvent(types.EventSignedBlock).String() ) type BlockFetcher struct { - client Client + client coregrpc.BlockAPIClient - doneCh chan struct{} - cancel context.CancelFunc + doneCh chan struct{} + cancel context.CancelFunc + isListeningForBlocks atomic.Bool } // NewBlockFetcher returns a new `BlockFetcher`. -func NewBlockFetcher(client Client) *BlockFetcher { +func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) { return &BlockFetcher{ - client: client, + client: coregrpc.NewBlockAPIClient(conn), + }, nil +} + +// Stop stops the block fetcher. +// The underlying gRPC connection needs to be stopped separately. +func (f *BlockFetcher) Stop(ctx context.Context) error { + f.cancel() + select { + case <-f.doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) } } // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. -func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) { +func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) { commit, err := f.Commit(ctx, height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err) @@ -45,7 +70,7 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types. // commit and getting the latest validator set. Therefore, it is // best to get the validator set at the latest commit's height to // prevent this potential inconsistency. - valSet, err := f.ValidatorSet(ctx, &commit.Height) + valSet, err := f.ValidatorSet(ctx, commit.Height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting validator set at height %d: %w", height, err) } @@ -54,41 +79,50 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types. } // GetBlock queries Core for a `Block` at the given height. -func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) { - res, err := f.client.Block(ctx, height) +// if the height is nil, use the latest height +func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock, error) { + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err } - - if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, height: %d", height) + block, err := receiveBlockByHeight(stream) + if err != nil { + return nil, err } - - return res.Block, nil + return block, nil } func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) { - res, err := f.client.BlockByHash(ctx, hash) + if hash == nil { + return nil, fmt.Errorf("cannot get block with nil hash") + } + stream, err := f.client.BlockByHash(ctx, &coregrpc.BlockByHashRequest{Hash: hash}) if err != nil { return nil, err } - - if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, hash: %s", hash.String()) + block, err := receiveBlockByHash(stream) + if err != nil { + return nil, err } - return res.Block, nil + return block, nil } // GetSignedBlock queries Core for a `Block` at the given height. -func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*coretypes.ResultSignedBlock, error) { - return f.client.SignedBlock(ctx, height) +// if the height is nil, use the latest height. +func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) { + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) + if err != nil { + return nil, err + } + return receiveBlockByHeight(stream) } // Commit queries Core for a `Commit` from the block at // the given height. -func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit, error) { - res, err := f.client.Commit(ctx, height) +// If the height is nil, use the latest height. +func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) { + res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height}) if err != nil { return nil, err } @@ -97,45 +131,47 @@ func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit return nil, fmt.Errorf("core/fetcher: commit not found at height %d", height) } - return res.Commit, nil + commit, err := types.CommitFromProto(res.Commit) + if err != nil { + return nil, err + } + + return commit, nil } // ValidatorSet queries Core for the ValidatorSet from the // block at the given height. -func (f *BlockFetcher) ValidatorSet(ctx context.Context, height *int64) (*types.ValidatorSet, error) { - perPage := 100 - - vals, total := make([]*types.Validator, 0), -1 - for page := 1; len(vals) != total; page++ { - res, err := f.client.Validators(ctx, height, &page, &perPage) - if err != nil { - return nil, err - } +// If the height is nil, use the latest height. +func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) { + res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height}) + if err != nil { + return nil, err + } - if res != nil && len(res.Validators) == 0 { - return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height) - } + if res != nil && res.ValidatorSet == nil { + return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height) + } - total = res.Total - vals = append(vals, res.Validators...) + validatorSet, err := types.ValidatorSetFromProto(res.ValidatorSet) + if err != nil { + return nil, err } - return types.NewValidatorSet(vals), nil + return validatorSet, nil } // SubscribeNewBlockEvent subscribes to new block events from Core, returning // a new block event channel on success. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { - // start the client if not started yet - if !f.client.IsRunning() { - return nil, errors.New("client not running") + if f.isListeningForBlocks.Load() { + return nil, fmt.Errorf("already subscribed to new blocks") } - ctx, cancel := context.WithCancel(ctx) f.cancel = cancel f.doneCh = make(chan struct{}) + f.isListeningForBlocks.Store(true) - eventChan, err := f.client.Subscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery) + subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) if err != nil { return nil, err } @@ -144,18 +180,34 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types go func() { defer close(f.doneCh) defer close(signedBlockCh) + defer func() { f.isListeningForBlocks.Store(false) }() for { select { case <-ctx.Done(): return - case newEvent, ok := <-eventChan: - if !ok { - log.Errorw("fetcher: new blocks subscription channel closed unexpectedly") - return + default: + resp, err := subscription.Recv() + if err != nil { + log.Errorw("fetcher: error receiving new height", "err", err.Error()) + continue + } + withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) + signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height) + ctxCancel() + if err != nil { + log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) + // sleeping a bit to avoid retrying instantly and give time for the gRPC connection + // to recover automatically. + time.Sleep(time.Second) + continue } - signedBlock := newEvent.Data.(types.EventDataSignedBlock) select { - case signedBlockCh <- signedBlock: + case signedBlockCh <- types.EventDataSignedBlock{ + Header: *signedBlock.Header, + Commit: *signedBlock.Commit, + ValidatorSet: *signedBlock.ValidatorSet, + Data: *signedBlock.Data, + }: case <-ctx.Done(): return } @@ -166,24 +218,101 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types return signedBlockCh, nil } -// UnsubscribeNewBlockEvent stops the subscription to new block events from Core. -func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error { - f.cancel() - select { - case <-f.doneCh: - case <-ctx.Done(): - return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) - } - return f.client.Unsubscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery) -} - // IsSyncing returns the sync status of the Core connection: true for // syncing, and false for already caught up. It can also return an error // in the case of a failed status request. func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { - resp, err := f.client.Status(ctx) + resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) if err != nil { return false, err } return resp.SyncInfo.CatchingUp, nil } + +func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( + *SignedBlock, + error, +) { + parts := make([]*tmproto.Part, 0) + + // receive the first part to get the block meta, commit, and validator set + firstPart, err := streamer.Recv() + if err != nil { + return nil, err + } + commit, err := types.CommitFromProto(firstPart.Commit) + if err != nil { + return nil, err + } + validatorSet, err := types.ValidatorSetFromProto(firstPart.ValidatorSet) + if err != nil { + return nil, err + } + parts = append(parts, firstPart.BlockPart) + + // receive the rest of the block + isLast := firstPart.IsLast + for !isLast { + resp, err := streamer.Recv() + if err != nil { + return nil, err + } + parts = append(parts, resp.BlockPart) + isLast = resp.IsLast + } + block, err := partsToBlock(parts) + if err != nil { + return nil, err + } + return &SignedBlock{ + Header: &block.Header, + Commit: commit, + Data: &block.Data, + ValidatorSet: validatorSet, + }, nil +} + +func receiveBlockByHash(streamer coregrpc.BlockAPI_BlockByHashClient) (*types.Block, error) { + parts := make([]*tmproto.Part, 0) + isLast := false + for !isLast { + resp, err := streamer.Recv() + if err != nil { + return nil, err + } + parts = append(parts, resp.BlockPart) + isLast = resp.IsLast + } + return partsToBlock(parts) +} + +// partsToBlock takes a slice of parts and generates the corresponding block. +// It empties the slice to optimize the memory usage. +func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { + partSet := types.NewPartSetFromHeader(types.PartSetHeader{ + Total: uint32(len(parts)), + }) + for _, part := range parts { + ok, err := partSet.AddPartWithoutProof(&types.Part{Index: part.Index, Bytes: part.Bytes}) + if err != nil { + return nil, err + } + if !ok { + return nil, err + } + } + pbb := new(tmproto.Block) + bz, err := io.ReadAll(partSet.GetReader()) + if err != nil { + return nil, err + } + err = proto.Unmarshal(bz, pbb) + if err != nil { + return nil, err + } + block, err := types.BlockFromProto(pbb) + if err != nil { + return nil, err + } + return block, nil +} diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index 8b3af8e5e1..d184fb8b91 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -4,6 +4,7 @@ package core import ( "context" + "net" "testing" "time" @@ -18,8 +19,12 @@ func TestBlockFetcherHeaderValues(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) + node := StartTestNode(t) + host, port, err := net.SplitHostPort(node.GRPCClient.Target()) + require.NoError(t, err) + client := newTestClient(t, host, port) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) @@ -33,10 +38,10 @@ func TestBlockFetcherHeaderValues(t *testing.T) { require.NoError(t, ctx.Err()) } // get Commit from current height - commit, err := fetcher.Commit(ctx, &h) + commit, err := fetcher.Commit(ctx, h) require.NoError(t, err) // get ValidatorSet from current height - valSet, err := fetcher.ValidatorSet(ctx, &h) + valSet, err := fetcher.ValidatorSet(ctx, h) require.NoError(t, err) // get next block var nextBlock types.EventDataSignedBlock @@ -51,5 +56,5 @@ func TestBlockFetcherHeaderValues(t *testing.T) { // compare ValidatorSet hash to the ValidatorsHash from first block height hexBytes := valSet.Hash() assert.Equal(t, nextBlock.ValidatorSet.Hash(), hexBytes) - require.NoError(t, fetcher.UnsubscribeNewBlockEvent(ctx)) + require.NoError(t, fetcher.Stop(ctx)) } diff --git a/core/fetcher_test.go b/core/fetcher_test.go index 42afa42bcd..8d7659494d 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -2,6 +2,7 @@ package core import ( "context" + "net" "testing" "time" @@ -13,9 +14,11 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) - + host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + client := newTestClient(t, host, port) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) @@ -24,16 +27,16 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { select { case newBlockFromChan := <-newBlockChan: h := newBlockFromChan.Header.Height - block, err := fetcher.GetSignedBlock(ctx, &h) + block, err := fetcher.GetSignedBlock(ctx, h) require.NoError(t, err) - assert.Equal(t, newBlockFromChan.Data, block.Data) - assert.Equal(t, newBlockFromChan.Header, block.Header) - assert.Equal(t, newBlockFromChan.Commit, block.Commit) - assert.Equal(t, newBlockFromChan.ValidatorSet, block.ValidatorSet) + assert.Equal(t, newBlockFromChan.Data, *block.Data) + assert.Equal(t, newBlockFromChan.Header, *block.Header) + assert.Equal(t, newBlockFromChan.Commit, *block.Commit) + assert.Equal(t, newBlockFromChan.ValidatorSet, *block.ValidatorSet) require.GreaterOrEqual(t, newBlockFromChan.Header.Height, int64(i)) case <-ctx.Done(): require.NoError(t, ctx.Err()) } } - require.NoError(t, fetcher.UnsubscribeNewBlockEvent(ctx)) + require.NoError(t, fetcher.Stop(ctx)) } diff --git a/core/header_test.go b/core/header_test.go index 7b7eb3a7b7..dcc5dba9e2 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "net" "testing" "github.com/stretchr/testify/assert" @@ -20,24 +21,26 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) - + host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + client := newTestClient(t, host, port) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) <-sub height := int64(1) - b, err := fetcher.GetBlock(ctx, &height) + b, err := fetcher.GetBlock(ctx, height) require.NoError(t, err) - comm, val, err := fetcher.GetBlockInfo(ctx, &height) + comm, val, err := fetcher.GetBlockInfo(ctx, height) require.NoError(t, err) eds, err := extendBlock(b.Data, b.Header.Version.App) require.NoError(t, err) - headerExt, err := header.MakeExtendedHeader(&b.Header, comm, val, eds) + headerExt, err := header.MakeExtendedHeader(b.Header, comm, val, eds) require.NoError(t, err) assert.Equal(t, share.EmptyEDSRoots(), headerExt.DAH) diff --git a/core/listener.go b/core/listener.go index d403421175..c020b84758 100644 --- a/core/listener.go +++ b/core/listener.go @@ -111,9 +111,9 @@ func (cl *Listener) Start(context.Context) error { // Stop stops the listener loop. func (cl *Listener) Stop(ctx context.Context) error { - err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) + err := cl.fetcher.Stop(ctx) if err != nil { - log.Warnw("listener: unsubscribing from new block event", "err", err) + log.Warnw("listener: stopping gRPC block event", "err", err) } cl.cancel() @@ -156,7 +156,7 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat } func (cl *Listener) resubscribe(ctx context.Context) <-chan types.EventDataSignedBlock { - err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) + err := cl.fetcher.Stop(ctx) if err != nil { log.Warnw("listener: unsubscribe", "err", err) } @@ -228,7 +228,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS attribute.Int64("height", b.Header.Height), ) - eds, err := extendBlock(b.Data, b.Header.Version.App) + eds, err := extendBlock(&b.Data, b.Header.Version.App) if err != nil { return fmt.Errorf("extending block data: %w", err) } diff --git a/core/testing.go b/core/testing.go index 6d2aa8cc36..586bf57f83 100644 --- a/core/testing.go +++ b/core/testing.go @@ -1,14 +1,16 @@ package core import ( + "context" "net" - "net/url" "testing" "time" "github.com/stretchr/testify/require" - tmconfig "github.com/tendermint/tendermint/config" tmrand "github.com/tendermint/tendermint/libs/rand" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/test/util/genesis" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" @@ -50,37 +52,12 @@ func StartTestNode(t *testing.T) testnode.Context { func StartTestNodeWithConfig(t *testing.T, cfg *testnode.Config) testnode.Context { cctx, _, _ := testnode.NewNetwork(t, cfg) // we want to test over remote http client, - // so we are as close to the real environment as possible - // however, it might be useful to use local tendermint client - // if you need to debug something inside of it - ip, port, err := getEndpoint(cfg.TmConfig) - require.NoError(t, err) - client, err := NewRemote(ip, port) - require.NoError(t, err) - - err = client.Start() - require.NoError(t, err) - t.Cleanup(func() { - err := client.Stop() - require.NoError(t, err) - }) - - cctx.WithClient(client) + // so we are as close to the real environment as possible, + // however, it might be useful to use a local tendermint client + // if you need to debug something inside it return cctx } -func getEndpoint(cfg *tmconfig.Config) (string, string, error) { - url, err := url.Parse(cfg.RPC.ListenAddress) - if err != nil { - return "", "", err - } - host, _, err := net.SplitHostPort(url.Host) - if err != nil { - return "", "", err - } - return host, url.Port(), nil -} - // generateRandomAccounts generates n random account names. func generateRandomAccounts(n int) []string { accounts := make([]string, n) @@ -89,3 +66,16 @@ func generateRandomAccounts(n int) []string { } return accounts } + +func newTestClient(t *testing.T, ip, port string) *grpc.ClientConn { + t.Helper() + opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + endpoint := net.JoinHostPort(ip, port) + client, err := grpc.NewClient(endpoint, opt) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + t.Cleanup(cancel) + ready := client.WaitForStateChange(ctx, connectivity.Ready) + require.True(t, ready) + return client +} diff --git a/go.mod b/go.mod index b2ad5e8a62..495934f61b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/benbjohnson/clock v1.3.5 - github.com/celestiaorg/celestia-app/v3 v3.0.2 + github.com/celestiaorg/celestia-app/v3 v3.3.0-rc0 github.com/celestiaorg/go-fraud v0.2.1 github.com/celestiaorg/go-header v0.6.3 github.com/celestiaorg/go-libp2p-messenger v0.2.0 @@ -27,7 +27,6 @@ require ( github.com/gorilla/mux v1.8.1 github.com/grafana/otel-profiling-go v0.5.1 github.com/grafana/pyroscope-go v1.1.2 - github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/imdario/mergo v0.3.16 github.com/ipfs/boxo v0.24.0 @@ -56,7 +55,7 @@ require ( github.com/rollkit/go-da v0.8.0 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tendermint/tendermint v0.34.29 go.opentelemetry.io/contrib/instrumentation/runtime v0.45.0 go.opentelemetry.io/otel v1.31.0 @@ -70,10 +69,10 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 go.uber.org/fx v1.23.0 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.28.0 + golang.org/x/crypto v0.31.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c - golang.org/x/sync v0.8.0 - golang.org/x/text v0.19.0 + golang.org/x/sync v0.10.0 + golang.org/x/text v0.21.0 google.golang.org/grpc v1.68.0 google.golang.org/protobuf v1.35.1 ) @@ -129,10 +128,9 @@ require ( github.com/cosmos/iavl v0.19.6 // indirect github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2 // indirect github.com/cosmos/ibc-go/v6 v6.3.0 // indirect - github.com/cosmos/ledger-cosmos-go v0.13.2 // indirect + github.com/cosmos/ledger-cosmos-go v0.14.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect - github.com/creachadair/taskgroup v0.3.2 // indirect github.com/cskr/pubsub v1.0.2 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -332,10 +330,10 @@ require ( go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.30.0 // indirect + golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/term v0.25.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/term v0.27.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect @@ -354,12 +352,12 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 + github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16 github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35 ) replace github.com/ipfs/boxo => github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b diff --git a/go.sum b/go.sum index bbb12f2221..5715acbcd2 100644 --- a/go.sum +++ b/go.sum @@ -347,12 +347,12 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b h1:M9X7s1WJ/7Ju84ZUbO/6/8XlODkFsj/ln85AE0F6pj8= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b/go.mod h1:OpUrJtGmZZktUqJvPOtmP8wSfEFcdF/55d3PNCcYLwc= -github.com/celestiaorg/celestia-app/v3 v3.0.2 h1:N9KOGcedhbQpK4XfDZ/OG5za/bV94N4QE72o4gSZ+EA= -github.com/celestiaorg/celestia-app/v3 v3.0.2/go.mod h1:Ut3ytZG2+RcmeCxrYyJ5KOGaFoGnVcShIN+IufyDDSY= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 h1:L4GTm+JUXhB0a/nGPMq6jEqqe6THuYSQ8m2kUCtZYqw= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 h1:f+fTe7GGk0/qgdzyqB8kk8EcDf9d6MC22khBTQiDXsU= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16/go.mod h1:07Z8HJqS8Rw4XlZ+ok3D3NM/X/in8mvcGLvl0Zb5wrA= +github.com/celestiaorg/celestia-app/v3 v3.3.0-rc0 h1:YKhBIUAt6CZjxZijql73mBszuddJyVSTyQd/3ZBxxcc= +github.com/celestiaorg/celestia-app/v3 v3.3.0-rc0/go.mod h1:MKhiQgATDdLouzC5KvXDAnDpEgIXyD0MNiq0ChrWFco= +github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35 h1:T21AhezjcByAlWDHmiVbpg743Uqk/dqBzJkQsAnbQf8= +github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35/go.mod h1:fQ46s1hYFTGFBsHsuGsbxDZ720ZPQow5Iyqw+yErZSo= +github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16 h1:qxWiGrDEcg4FzVTpIXU/v3wjP7q1Lz4AMhSBBRABInU= +github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16/go.mod h1:W30mNt3+2l516HVR8Gt9+Gf4qOrWC9/x18MTEx2GljE= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= github.com/celestiaorg/go-fraud v0.2.1/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= github.com/celestiaorg/go-header v0.6.3 h1:VI+fsNxFLeUS7cNn0LgHP6Db66uslnKp/fgMg5nxqHg= @@ -481,8 +481,8 @@ github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2 h1:Hz4 github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v6 v6.1.2/go.mod h1:Jo934o/sW7fNxuOa/TjCalSalz+1Fd649eLyANaJx8g= github.com/cosmos/ibc-go/v6 v6.3.0 h1:2EkkqDEd9hTQvzB/BsPhYZsu7T/dzAVA8+VD2UuJLSQ= github.com/cosmos/ibc-go/v6 v6.3.0/go.mod h1:Dm14j9s094bGyCEE8W4fD+2t8IneHv+cz+80Mvwjr1w= -github.com/cosmos/ledger-cosmos-go v0.13.2 h1:aY0KZSmUwNKbBm9OvbIjvf7Ozz2YzzpAbgvN2C8x2T0= -github.com/cosmos/ledger-cosmos-go v0.13.2/go.mod h1:HENcEP+VtahZFw38HZ3+LS3Iv5XV6svsnkk9vdJtLr8= +github.com/cosmos/ledger-cosmos-go v0.14.0 h1:WfCHricT3rPbkPSVKRH+L4fQGKYHuGOK9Edpel8TYpE= +github.com/cosmos/ledger-cosmos-go v0.14.0/go.mod h1:E07xCWSBl3mTGofZ2QnL4cIUzMbbGVyik84QYKbX3RA= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -493,8 +493,6 @@ github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c h1:uQYC5Z1mdLR github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c/go.mod h1:geZJZH3SzKCqnz5VT0q/DyIG/tvu/dZk+VIfXicupJs= github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI= github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= -github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM= -github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+kq+TDlRJQ0Wbk= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cristalhq/jwt/v5 v5.4.0 h1:Wxi1TocFHaijyV608j7v7B9mPc4ZNjvWT3LKBO0d4QI= @@ -600,8 +598,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpm github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY= @@ -919,8 +915,6 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-getter v1.7.5 h1:dT58k9hQ/vbxNMwoI5+xFYAJuv6152UNvdHokfI5wE4= github.com/hashicorp/go-getter v1.7.5/go.mod h1:W7TalhMmbPmsSMdNjD0ZskARur/9GJ17cfHTRtXV744= -github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= -github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= @@ -929,8 +923,6 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= -github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-safetemp v1.0.0 h1:2HR189eFNrjHQyENnQMMpCiBAsRxzbTMIgBhEyExpmo= github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoDHxNAB65b+Rj1I= @@ -1802,8 +1794,9 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/supranational/blst v0.3.13 h1:AYeSxdOMacwu7FBmpfloBz5pbFXDmJL33RuwnKtmTjk= @@ -2021,8 +2014,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2154,8 +2147,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -2201,8 +2194,8 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2332,8 +2325,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2343,8 +2336,8 @@ golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= -golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= -golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2359,8 +2352,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/nodebuilder/config_test.go b/nodebuilder/config_test.go index 519466d88e..227fefd3c7 100644 --- a/nodebuilder/config_test.go +++ b/nodebuilder/config_test.go @@ -61,8 +61,7 @@ func TestUpdateConfig(t *testing.T) { var outdatedConfig = ` [Core] IP = "0.0.0.0" - RPCPort = "0" - GRPCPort = "0" + Port = "0" [State] DefaultKeyName = "thisshouldnthavechanged" diff --git a/nodebuilder/core/config.go b/nodebuilder/core/config.go index b82be1d809..53472069de 100644 --- a/nodebuilder/core/config.go +++ b/nodebuilder/core/config.go @@ -8,26 +8,30 @@ import ( ) const ( - DefaultRPCPort = "26657" - DefaultGRPCPort = "9090" + DefaultPort = "9090" ) var MetricsEnabled bool // Config combines all configuration fields for managing the relationship with a Core node. type Config struct { - IP string - RPCPort string - GRPCPort string + IP string + Port string + // TLSEnabled specifies whether the connection is secure or not. + // PLEASE NOTE: it should be set to true in order to handle XTokenPath. + TLSEnabled bool + // XTokenPath specifies the path to the directory with JSON file containing the X-Token for gRPC authentication. + // The JSON file should have a key-value pair where the key is "x-token" and the value is the authentication token. + // If left empty, the client will not include the X-Token in its requests. + XTokenPath string } // DefaultConfig returns default configuration for managing the // node's connection to a Celestia-Core endpoint. func DefaultConfig() Config { return Config{ - IP: "", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "", + Port: DefaultPort, } } @@ -37,10 +41,7 @@ func (cfg *Config) Validate() error { return nil } - if cfg.RPCPort == "" { - return fmt.Errorf("nodebuilder/core: rpc port is not set") - } - if cfg.GRPCPort == "" { + if cfg.Port == "" { return fmt.Errorf("nodebuilder/core: grpc port is not set") } @@ -49,11 +50,7 @@ func (cfg *Config) Validate() error { return err } cfg.IP = ip - _, err = strconv.Atoi(cfg.RPCPort) - if err != nil { - return fmt.Errorf("nodebuilder/core: invalid rpc port: %s", err.Error()) - } - _, err = strconv.Atoi(cfg.GRPCPort) + _, err = strconv.Atoi(cfg.Port) if err != nil { return fmt.Errorf("nodebuilder/core: invalid grpc port: %s", err.Error()) } diff --git a/nodebuilder/core/config_test.go b/nodebuilder/core/config_test.go index 6535e28807..90994d6e15 100644 --- a/nodebuilder/core/config_test.go +++ b/nodebuilder/core/config_test.go @@ -15,9 +15,8 @@ func TestValidate(t *testing.T) { { name: "valid config", cfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectErr: false, }, @@ -29,52 +28,31 @@ func TestValidate(t *testing.T) { { name: "hostname preserved", cfg: Config{ - IP: "celestia.org", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "celestia.org", + Port: DefaultPort, }, expectErr: false, }, - { - name: "missing RPC port", - cfg: Config{ - IP: "127.0.0.1", - GRPCPort: DefaultGRPCPort, - }, - expectErr: true, - }, { name: "missing GRPC port", cfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, + IP: "127.0.0.1", }, expectErr: true, }, { name: "invalid IP, but will be accepted as host and not raise an error", cfg: Config{ - IP: "invalid-ip", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "invalid-ip", + Port: DefaultPort, }, expectErr: false, }, { - name: "invalid RPC port", - cfg: Config{ - IP: "127.0.0.1", - RPCPort: "invalid-port", - GRPCPort: DefaultGRPCPort, - }, - expectErr: true, - }, - { - name: "invalid GRPC port", + name: "invalid port", cfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: "invalid-port", + IP: "127.0.0.1", + Port: "invalid-port", }, expectErr: true, }, diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index 53c914a041..e4e7593e3f 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -1,9 +1,115 @@ package core import ( - "github.com/celestiaorg/celestia-node/core" + "context" + "crypto/tls" + "encoding/json" + "errors" + "net" + "os" + "path/filepath" + + "go.uber.org/fx" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + + "github.com/celestiaorg/celestia-node/libs/utils" ) -func remote(cfg Config) (core.Client, error) { - return core.NewRemote(cfg.IP, cfg.RPCPort) +const xtokenFileName = "xtoken.json" + +func grpcClient(lc fx.Lifecycle, cfg Config) (*grpc.ClientConn, error) { + var opts []grpc.DialOption + if cfg.TLSEnabled { + opts = append(opts, grpc.WithTransportCredentials( + credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})), + ) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + if cfg.XTokenPath != "" { + xToken, err := parseTokenPath(cfg.XTokenPath) + if err != nil { + return nil, err + } + opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(xToken))) + opts = append(opts, grpc.WithStreamInterceptor(authStreamInterceptor(xToken))) + } + + endpoint := net.JoinHostPort(cfg.IP, cfg.Port) + conn, err := grpc.NewClient(endpoint, opts...) + if err != nil { + return nil, err + } + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + conn.Connect() + if !conn.WaitForStateChange(ctx, connectivity.Ready) { + return errors.New("couldn't connect to core endpoint") + } + return nil + }, + OnStop: func(context.Context) error { + return conn.Close() + }, + }) + return conn, nil +} + +func authInterceptor(xtoken string) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func authStreamInterceptor(xtoken string) grpc.StreamClientInterceptor { + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) + return streamer(ctx, desc, cc, method, opts...) + } +} + +// parseTokenPath retrieves the authentication token from a JSON file at the specified path. +func parseTokenPath(xtokenPath string) (string, error) { + xtokenPath = filepath.Join(xtokenPath, xtokenFileName) + exist := utils.Exists(xtokenPath) + if !exist { + return "", os.ErrNotExist + } + + token, err := os.ReadFile(xtokenPath) + if err != nil { + return "", err + } + + auth := struct { + Token string `json:"x-token"` + }{} + + err = json.Unmarshal(token, &auth) + if err != nil { + return "", err + } + if auth.Token == "" { + return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath") + } + return auth.Token, nil } diff --git a/nodebuilder/core/flags.go b/nodebuilder/core/flags.go index 127ee5ee60..97b0757d23 100644 --- a/nodebuilder/core/flags.go +++ b/nodebuilder/core/flags.go @@ -8,9 +8,10 @@ import ( ) var ( - coreFlag = "core.ip" - coreRPCFlag = "core.rpc.port" - coreGRPCFlag = "core.grpc.port" + coreFlag = "core.ip" + coreGRPCFlag = "core.grpc.port" + coreTLS = "core.tls" + coreXTokenPathFlag = "core.xtoken.path" //nolint:gosec ) // Flags gives a set of hardcoded Core flags. @@ -22,18 +23,26 @@ func Flags() *flag.FlagSet { "", "Indicates node to connect to the given core node. "+ "Example: , 127.0.0.1. , subdomain.domain.tld "+ - "Assumes RPC port 26657 and gRPC port 9090 as default unless otherwise specified.", - ) - flags.String( - coreRPCFlag, - DefaultRPCPort, - "Set a custom RPC port for the core node connection. The --core.ip flag must also be provided.", + "Assumes gRPC port 9090 as default unless otherwise specified.", ) flags.String( coreGRPCFlag, - DefaultGRPCPort, + DefaultPort, "Set a custom gRPC port for the core node connection. The --core.ip flag must also be provided.", ) + flags.Bool( + coreTLS, + false, + "Specifies whether TLS is enabled or not. Default: false", + ) + flags.String( + coreXTokenPathFlag, + "", + "specifies the file path to the JSON file containing the X-Token for gRPC authentication. "+ + "The JSON file should have a key-value pair where the key is 'x-token' and the value is the authentication token. "+ + "NOTE: the path is parsed only if coreTLS enabled."+ + "If left empty, the client will not include the X-Token in its requests.", + ) return flags } @@ -44,22 +53,29 @@ func ParseFlags( ) error { coreIP := cmd.Flag(coreFlag).Value.String() if coreIP == "" { - if cmd.Flag(coreGRPCFlag).Changed || cmd.Flag(coreRPCFlag).Changed { - return fmt.Errorf("cannot specify RPC/gRPC ports without specifying an IP address for --core.ip") + if cmd.Flag(coreGRPCFlag).Changed { + return fmt.Errorf("cannot specify gRPC port without specifying an IP address for --core.ip") } return nil } - if cmd.Flag(coreRPCFlag).Changed { - rpc := cmd.Flag(coreRPCFlag).Value.String() - cfg.RPCPort = rpc - } - if cmd.Flag(coreGRPCFlag).Changed { grpc := cmd.Flag(coreGRPCFlag).Value.String() - cfg.GRPCPort = grpc + cfg.Port = grpc + } + + enabled, err := cmd.Flags().GetBool(coreTLS) + if err != nil { + return err } + if enabled { + cfg.TLSEnabled = true + if cmd.Flag(coreXTokenPathFlag).Changed { + path := cmd.Flag(coreXTokenPathFlag).Value.String() + cfg.XTokenPath = path + } + } cfg.IP = coreIP return cfg.Validate() } diff --git a/nodebuilder/core/flags_test.go b/nodebuilder/core/flags_test.go index ce906de037..a1a11a9a23 100644 --- a/nodebuilder/core/flags_test.go +++ b/nodebuilder/core/flags_test.go @@ -20,9 +20,8 @@ func TestParseFlags(t *testing.T) { args: []string{}, inputCfg: Config{}, expectedCfg: Config{ - IP: "", - RPCPort: "", - GRPCPort: "", + IP: "", + Port: "", }, expectError: false, }, @@ -30,13 +29,11 @@ func TestParseFlags(t *testing.T) { name: "only core.ip", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + Port: DefaultPort, }, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectError: false, }, @@ -45,9 +42,8 @@ func TestParseFlags(t *testing.T) { args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{}, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectError: true, }, @@ -55,14 +51,12 @@ func TestParseFlags(t *testing.T) { name: "no flags, values from input config.toml ", args: []string{}, inputCfg: Config{ - IP: "127.162.36.1", - RPCPort: "1234", - GRPCPort: "5678", + IP: "127.162.36.1", + Port: "5678", }, expectedCfg: Config{ - IP: "127.162.36.1", - RPCPort: "1234", - GRPCPort: "5678", + IP: "127.162.36.1", + Port: "5678", }, expectError: false, }, @@ -70,27 +64,11 @@ func TestParseFlags(t *testing.T) { name: "only core.ip, with config.toml overridden defaults for ports", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - RPCPort: "1234", - GRPCPort: "5678", + Port: "5678", }, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: "1234", - GRPCPort: "5678", - }, - expectError: false, - }, - { - name: "core.ip and core.rpc.port", - args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345"}, - inputCfg: Config{ - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, - }, - expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: "12345", - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: "5678", }, expectError: false, }, @@ -98,32 +76,14 @@ func TestParseFlags(t *testing.T) { name: "core.ip and core.grpc.port", args: []string{"--core.ip=127.0.0.1", "--core.grpc.port=54321"}, inputCfg: Config{ - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, - }, - expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: "54321", + Port: DefaultPort, }, - expectError: false, - }, - { - name: "core.ip, core.rpc.port, and core.grpc.port", - args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345", "--core.grpc.port=54321"}, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: "12345", - GRPCPort: "54321", + IP: "127.0.0.1", + Port: "54321", }, expectError: false, }, - { - name: "core.rpc.port without core.ip", - args: []string{"--core.rpc.port=12345"}, - expectedCfg: Config{}, - expectError: true, - }, { name: "core.grpc.port without core.ip", args: []string{"--core.grpc.port=54321"}, diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index 441907ce32..a81365659a 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -25,6 +25,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option baseComponents := fx.Options( fx.Supply(*cfg), fx.Error(cfgErr), + fx.Provide(grpcClient), fx.Options(options...), ) @@ -74,15 +75,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return listener.Stop(ctx) }), )), - fx.Provide(fx.Annotate( - remote, - fx.OnStart(func(_ context.Context, client core.Client) error { - return client.Start() - }), - fx.OnStop(func(_ context.Context, client core.Client) error { - return client.Stop() - }), - )), ) default: panic("invalid node type") diff --git a/nodebuilder/core/opts.go b/nodebuilder/core/opts.go index 56347a5cb6..5de789daa9 100644 --- a/nodebuilder/core/opts.go +++ b/nodebuilder/core/opts.go @@ -2,15 +2,15 @@ package core import ( "go.uber.org/fx" + "google.golang.org/grpc" - "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/libs/fxutil" ) -// WithClient sets custom client for core process -func WithClient(client core.Client) fx.Option { - return fxutil.ReplaceAs(client, new(core.Client)) +// WithConnection sets a custom client for core process +func WithConnection(conn *grpc.ClientConn) fx.Option { + return fxutil.ReplaceAs(conn, new(grpc.ClientConn)) } // WithHeaderConstructFn sets custom func that creates extended header diff --git a/nodebuilder/module.go b/nodebuilder/module.go index 5a774b8b9b..43de56eedd 100644 --- a/nodebuilder/module.go +++ b/nodebuilder/module.go @@ -42,14 +42,14 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store fx.Supply(store.Config), fx.Provide(store.Datastore), fx.Provide(store.Keystore), + core.ConstructModule(tp, &cfg.Core), fx.Supply(node.StorePath(store.Path())), // modules provided by the node p2p.ConstructModule(tp, &cfg.P2P), - state.ConstructModule(tp, &cfg.State, &cfg.Core), modhead.ConstructModule[*header.ExtendedHeader](tp, &cfg.Header), share.ConstructModule(tp, &cfg.Share), gateway.ConstructModule(tp, &cfg.Gateway), - core.ConstructModule(tp, &cfg.Core), + state.ConstructModule(tp, &cfg.State, &cfg.Core), das.ConstructModule(tp, &cfg.DASer), fraud.ConstructModule(tp), blob.ConstructModule(), diff --git a/nodebuilder/node_bridge_test.go b/nodebuilder/node_bridge_test.go index d2b7ebaf4e..5647a6afa1 100644 --- a/nodebuilder/node_bridge_test.go +++ b/nodebuilder/node_bridge_test.go @@ -2,9 +2,12 @@ package nodebuilder import ( "context" + "net" "testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-node/core" coremodule "github.com/celestiaorg/celestia-node/nodebuilder/core" @@ -19,8 +22,14 @@ func TestBridge_WithMockedCoreClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - client := core.StartTestNode(t).Client - node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(client)) + _, _, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + con, err := grpc.NewClient( + core.StartTestNode(t).GRPCClient.Target(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithConnection(con)) require.NoError(t, err) require.NotNil(t, node) err = node.Start(ctx) diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index bcf5f01942..d78be08903 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -4,6 +4,7 @@ package nodebuilder import ( "context" + "net" "net/http" "net/http/httptest" "strconv" @@ -15,6 +16,7 @@ import ( collectormetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" "google.golang.org/protobuf/proto" + "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/nodebuilder/node" ) @@ -29,7 +31,16 @@ func TestLifecycle(t *testing.T) { for i, tt := range test { t.Run(strconv.Itoa(i), func(t *testing.T) { - node := TestNode(t, tt.tp) + // we're also creating a test node because the gRPC connection + // is started automatically when starting the node. + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + + cfg := DefaultConfig(tt.tp) + cfg.Core.IP = host + cfg.Core.Port = port + + node := TestNodeWithConfig(t, tt.tp, cfg) require.NotNil(t, node) require.NotNil(t, node.Config) require.NotNil(t, node.Host) @@ -41,7 +52,7 @@ func TestLifecycle(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := node.Start(ctx) + err = node.Start(ctx) require.NoError(t, err) err = node.Stop(ctx) @@ -67,9 +78,19 @@ func TestLifecycle_WithMetrics(t *testing.T) { for i, tt := range test { t.Run(strconv.Itoa(i), func(t *testing.T) { - node := TestNode( + // we're also creating a test node because the gRPC connection + // is started automatically when starting the node. + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + + cfg := DefaultConfig(tt.tp) + cfg.Core.IP = host + cfg.Core.Port = port + + node := TestNodeWithConfig( t, tt.tp, + cfg, WithMetrics( []otlpmetrichttp.Option{ otlpmetrichttp.WithEndpoint(otelCollectorURL), @@ -88,7 +109,7 @@ func TestLifecycle_WithMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := node.Start(ctx) + err = node.Start(ctx) require.NoError(t, err) err = node.Stop(ctx) diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index 39ab732368..d66da88c44 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -2,12 +2,12 @@ package state import ( "github.com/cosmos/cosmos-sdk/crypto/keyring" + "google.golang.org/grpc" libfraud "github.com/celestiaorg/go-fraud" "github.com/celestiaorg/go-header/sync" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/nodebuilder/core" modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/share/eds/byzantine" @@ -17,27 +17,24 @@ import ( // coreAccessor constructs a new instance of state.Module over // a celestia-core connection. func coreAccessor( - corecfg core.Config, keyring keyring.Keyring, keyname AccountName, sync *sync.Syncer[*header.ExtendedHeader], fraudServ libfraud.Service[*header.ExtendedHeader], network p2p.Network, - opts []state.Option, + client *grpc.ClientConn, ) ( *state.CoreAccessor, Module, *modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader], error, ) { - ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, corecfg.IP, corecfg.GRPCPort, - network.String(), opts...) + ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String()) sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ Service: ca, FraudType: byzantine.BadEncoding, FraudServ: fraudServ, } - return ca, ca, sBreaker, err } diff --git a/nodebuilder/state/module.go b/nodebuilder/state/module.go index bd6f2081d3..0e80ab3209 100644 --- a/nodebuilder/state/module.go +++ b/nodebuilder/state/module.go @@ -23,11 +23,9 @@ var log = logging.Logger("module/state") func ConstructModule(tp node.Type, cfg *Config, coreCfg *core.Config) fx.Option { // sanitize config values before constructing module cfgErr := cfg.Validate() - opts := make([]state.Option, 0) baseComponents := fx.Options( fx.Supply(*cfg), fx.Error(cfgErr), - fx.Supply(opts), fx.Provide(func(ks keystore.Keystore) (keyring.Keyring, AccountName, error) { return Keyring(*cfg, ks) }), diff --git a/nodebuilder/testing.go b/nodebuilder/testing.go index 0f2e046882..44c026f007 100644 --- a/nodebuilder/testing.go +++ b/nodebuilder/testing.go @@ -1,12 +1,15 @@ package nodebuilder import ( + "net" "testing" "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/stretchr/testify/require" "go.uber.org/fx" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" libhead "github.com/celestiaorg/go-header" @@ -69,12 +72,18 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti fxutil.ReplaceAs(headertest.NewStore(t), new(libhead.Store[*header.ExtendedHeader])), ) - // in fact, we don't need core.Client in tests, but Bridge requires is a valid one - // or fails otherwise with failed attempt to connect with custom build client + // in fact, we don't need core.Client in tests, but the Bridge node requires a valid one. + // otherwise, it fails with a failed attempt to connect with a custom build client. if tp == node.Bridge { - cctx := core.StartTestNode(t) + _, _, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + con, err := grpc.NewClient( + core.StartTestNode(t).GRPCClient.Target(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) opts = append(opts, - fxutil.ReplaceAs(cctx.Client, new(core.Client)), + fxutil.ReplaceAs(con, new(grpc.ClientConn)), ) } diff --git a/nodebuilder/tests/fraud_test.go b/nodebuilder/tests/fraud_test.go index 524b306ac0..b156efe226 100644 --- a/nodebuilder/tests/fraud_test.go +++ b/nodebuilder/tests/fraud_test.go @@ -4,6 +4,7 @@ package tests import ( "context" + "net" "testing" "time" @@ -68,6 +69,8 @@ func TestFraudProofHandling(t *testing.T) { }) cfg := nodebuilder.DefaultConfig(node.Bridge) + cfg.Core.IP, cfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) // 1. bridge := sw.NewNodeWithConfig( node.Bridge, diff --git a/nodebuilder/tests/p2p_test.go b/nodebuilder/tests/p2p_test.go index 98e9fc15b4..4f7e87b5fb 100644 --- a/nodebuilder/tests/p2p_test.go +++ b/nodebuilder/tests/p2p_test.go @@ -4,6 +4,7 @@ package tests import ( "context" + "net" "testing" "time" @@ -77,8 +78,11 @@ func TestFullDiscoveryViaBootstrapper(t *testing.T) { // create and start a BN cfg := nodebuilder.DefaultConfig(node.Bridge) setTimeInterval(cfg, defaultTimeInterval) + var err error + cfg.Core.IP, cfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) bridge := sw.NewNodeWithConfig(node.Bridge, cfg) - err := bridge.Start(ctx) + err = bridge.Start(ctx) require.NoError(t, err) // use BN as the bootstrapper @@ -154,8 +158,11 @@ func TestRestartNodeDiscovery(t *testing.T) { // create and start a BN as a bootstrapper fullCfg := nodebuilder.DefaultConfig(node.Bridge) setTimeInterval(fullCfg, defaultTimeInterval) + var err error + fullCfg.Core.IP, fullCfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) bridge := sw.NewNodeWithConfig(node.Bridge, fullCfg) - err := bridge.Start(ctx) + err = bridge.Start(ctx) require.NoError(t, err) bridgeAddr := host.InfoFromHost(bridge.Host) diff --git a/nodebuilder/tests/prune_test.go b/nodebuilder/tests/prune_test.go index a92117b7f2..6c6cccea50 100644 --- a/nodebuilder/tests/prune_test.go +++ b/nodebuilder/tests/prune_test.go @@ -5,6 +5,7 @@ package tests import ( "bytes" "context" + "net" "testing" "time" @@ -185,9 +186,12 @@ func TestConvertFromPrunedToArchival(t *testing.T) { for _, nt := range []node.Type{node.Bridge, node.Full} { pruningCfg := nodebuilder.DefaultConfig(nt) pruningCfg.Pruner.EnableService = true + var err error + pruningCfg.Core.IP, pruningCfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) store := nodebuilder.MockStore(t, pruningCfg) pruningNode := sw.MustNewNodeWithStore(nt, store) - err := pruningNode.Start(ctx) + err = pruningNode.Start(ctx) require.NoError(t, err) err = pruningNode.Stop(ctx) require.NoError(t, err) diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 57d46bb64b..3fe779e76d 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -18,6 +18,8 @@ import ( "github.com/tendermint/tendermint/types" "go.uber.org/fx" "golang.org/x/exp/maps" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" libhead "github.com/celestiaorg/go-header" @@ -179,8 +181,19 @@ func (s *Swamp) setupGenesis() { store, err := store.NewStore(store.DefaultParameters(), s.t.TempDir()) require.NoError(s.t, err) + host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + require.NoError(s.t, err) + addr := net.JoinHostPort(host, port) + con, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(s.t, err) + fetcher, err := core.NewBlockFetcher(con) + require.NoError(s.t, err) + ex, err := core.NewExchange( - core.NewBlockFetcher(s.ClientContext.Client), + fetcher, store, header.MakeExtendedHeader, ) @@ -199,7 +212,7 @@ func (s *Swamp) DefaultTestConfig(tp node.Type) *nodebuilder.Config { require.NoError(s.t, err) cfg.Core.IP = ip - cfg.Core.GRPCPort = port + cfg.Core.Port = port return cfg } @@ -207,6 +220,9 @@ func (s *Swamp) DefaultTestConfig(tp node.Type) *nodebuilder.Config { // and a mockstore to the MustNewNodeWithStore method func (s *Swamp) NewBridgeNode(options ...fx.Option) *nodebuilder.Node { cfg := s.DefaultTestConfig(node.Bridge) + var err error + cfg.Core.IP, cfg.Core.Port, err = net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + require.NoError(s.t, err) store := nodebuilder.MockStore(s.t, cfg) return s.MustNewNodeWithStore(node.Bridge, store, options...) @@ -278,8 +294,18 @@ func (s *Swamp) NewNodeWithStore( switch tp { case node.Bridge: + host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + if err != nil { + return nil, err + } + addr := net.JoinHostPort(host, port) + con, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(s.t, err) options = append(options, - coremodule.WithClient(s.ClientContext.Client), + coremodule.WithConnection(con), ) default: } diff --git a/state/core_access.go b/state/core_access.go index a363577b1c..07747f5333 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -19,8 +19,6 @@ import ( "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/proto/tendermint/crypto" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/app/encoding" @@ -42,10 +40,6 @@ var ( log = logging.Logger("state") ) -// Option is the functional option that is applied to the coreAccessor instance -// to configure parameters. -type Option func(ca *CoreAccessor) - // CoreAccessor implements service over a gRPC connection // with a celestia-core node. type CoreAccessor struct { @@ -68,8 +62,6 @@ type CoreAccessor struct { prt *merkle.ProofRuntime coreConn *grpc.ClientConn - coreIP string - grpcPort string network string // these fields are mutatable and thus need to be protected by a mutex @@ -90,10 +82,8 @@ func NewCoreAccessor( keyring keyring.Keyring, keyname string, getter libhead.Head[*header.ExtendedHeader], - coreIP, - grpcPort string, + conn *grpc.ClientConn, network string, - options ...Option, ) (*CoreAccessor, error) { // create verifier prt := merkle.DefaultProofRuntime() @@ -104,46 +94,18 @@ func NewCoreAccessor( keyring: keyring, defaultSignerAccount: keyname, getter: getter, - coreIP: coreIP, - grpcPort: grpcPort, prt: prt, + coreConn: conn, network: network, } - - for _, opt := range options { - opt(ca) - } return ca, nil } func (ca *CoreAccessor) Start(ctx context.Context) error { - if ca.coreConn != nil { - return fmt.Errorf("core-access: already connected to core endpoint") - } ca.ctx, ca.cancel = context.WithCancel(context.Background()) - - // dial given celestia-core endpoint - endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort) - client, err := grpc.NewClient( - endpoint, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - return err - } - // this ensures we can't start the node without core connection - client.Connect() - if !client.WaitForStateChange(ctx, connectivity.Ready) { - // hits the case when context is canceled - return fmt.Errorf("couldn't connect to core endpoint(%s): %w", endpoint, ctx.Err()) - } - - ca.coreConn = client - // create the staking query client ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn) ca.feeGrantCli = feegrant.NewQueryClient(ca.coreConn) - // create ABCI query client ca.abciQueryCli = tmservice.NewServiceClient(ca.coreConn) resp, err := ca.abciQueryCli.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{}) @@ -171,29 +133,8 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { } func (ca *CoreAccessor) Stop(context.Context) error { - if ca.cancel == nil { - log.Warn("core accessor already stopped") - return nil - } - if ca.coreConn == nil { - log.Warn("no connection found to close") - return nil - } - defer ca.cancelCtx() - - // close out core connection - err := ca.coreConn.Close() - if err != nil { - return err - } - - ca.coreConn = nil - return nil -} - -func (ca *CoreAccessor) cancelCtx() { ca.cancel() - ca.cancel = nil + return nil } // SubmitPayForBlob builds, signs, and synchronously submits a MsgPayForBlob with additional diff --git a/state/core_access_test.go b/state/core_access_test.go index c487944749..cde4e8182f 100644 --- a/state/core_access_test.go +++ b/state/core_access_test.go @@ -6,13 +6,14 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/pkg/appconsts" @@ -216,11 +217,6 @@ func TestDelegate(t *testing.T) { } } -func extractPort(addr string) string { - splitStr := strings.Split(addr, ":") - return splitStr[len(splitStr)-1] -} - func buildAccessor(t *testing.T) (*CoreAccessor, []string) { chainID := "private" @@ -264,7 +260,9 @@ func buildAccessor(t *testing.T) (*CoreAccessor, []string) { WithAppCreator(appCreator) // needed until https://github.com/celestiaorg/celestia-app/pull/3680 merges cctx, _, grpcAddr := testnode.NewNetwork(t, config) - ca, err := NewCoreAccessor(cctx.Keyring, accounts[0].Name, nil, "127.0.0.1", extractPort(grpcAddr), chainID) + conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + ca, err := NewCoreAccessor(cctx.Keyring, accounts[0].Name, nil, conn, chainID) require.NoError(t, err) return ca, getNames(accounts) } diff --git a/state/integration_test.go b/state/integration_test.go index 8680b4d181..f606841417 100644 --- a/state/integration_test.go +++ b/state/integration_test.go @@ -52,8 +52,11 @@ func (s *IntegrationTestSuite) SetupSuite() { s.Require().Greater(len(s.accounts), 0) accountName := s.accounts[0].Name - accessor, err := NewCoreAccessor(s.cctx.Keyring, accountName, localHeader{s.cctx.Client}, "", "", "") + accessor, err := NewCoreAccessor(s.cctx.Keyring, accountName, localHeader{s.cctx.Client}, nil, "") require.NoError(s.T(), err) + ctx, cancel := context.WithCancel(context.Background()) + accessor.ctx = ctx + accessor.cancel = cancel setClients(accessor, s.cctx.GRPCClient) s.accessor = accessor @@ -65,8 +68,7 @@ func (s *IntegrationTestSuite) SetupSuite() { func setClients(ca *CoreAccessor, conn *grpc.ClientConn) { ca.coreConn = conn // create the staking query client - stakingCli := stakingtypes.NewQueryClient(ca.coreConn) - ca.stakingCli = stakingCli + ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn) ca.abciQueryCli = tmservice.NewServiceClient(ca.coreConn) }