Skip to content

Commit

Permalink
Decode gossip extra data as []bytes
Browse files Browse the repository at this point in the history
The type of extra data in go-legs gossip is bytes. But when it is parsed
as miner ID, it is cast to string then parsed. Instead, it should be
decoded from bytes.

Override "testnetnet" to "mainnet" in indexer ingest topic.
  • Loading branch information
gammazero committed Feb 11, 2022
1 parent 0cb3d02 commit f015ff7
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 12 deletions.
9 changes: 8 additions & 1 deletion build/params_shared_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ import (
func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) }
func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) }
func IndexerIngestTopic(netName dtypes.NetworkName) string {
return "/indexer/ingest/" + string(netName)
nn := string(netName)
// The network name testnetnet is here for historical reasons.
// Going forward we aim to use the name `mainnet` where possible.
if nn == "testnetnet" {
nn = "mainnet"
}

return "/indexer/ingest/" + nn
}
func DhtProtocolName(netName dtypes.NetworkName) protocol.ID {
return protocol.ID("/fil/kad/" + string(netName))
Expand Down
21 changes: 11 additions & 10 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,14 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
return pubsub.ValidationIgnore
}

minerID := string(idxrMsg.ExtraData)
// Get miner info from lotus
minerAddr, err := address.NewFromBytes(idxrMsg.ExtraData)
if err != nil {
log.Warnw("cannot parse extra data as miner address", "err", err, "extraData", idxrMsg.ExtraData)
return pubsub.ValidationReject
}

minerID := minerAddr.String()
msgCid := idxrMsg.Cid

var msgInfo *peerMsgInfo
Expand All @@ -527,15 +534,15 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
// Reject replayed messages.
seqno := binary.BigEndian.Uint64(msg.Message.GetSeqno())
if seqno <= msgInfo.lastSeqno {
log.Debugf("ignoring replayed indexer message")
log.Debug("ignoring replayed indexer message")
return pubsub.ValidationIgnore
}
msgInfo.lastSeqno = seqno
}

if !ok || originPeer != msgInfo.peerID {
// Check that the miner ID maps to the peer that sent the message.
err = v.authenticateMessage(ctx, minerID, originPeer)
err = v.authenticateMessage(ctx, minerAddr, originPeer)
if err != nil {
log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerID)
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
Expand Down Expand Up @@ -592,13 +599,7 @@ func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid
return false
}

func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerID string, peerID peer.ID) error {
// Get miner info from lotus
minerAddress, err := address.NewFromString(minerID)
if err != nil {
return xerrors.Errorf("invalid miner id: %w", err)
}

func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerAddress address.Address, peerID peer.ID) error {
ts, err := v.chainApi.ChainHead(ctx)
if err != nil {
return err
Expand Down
71 changes: 70 additions & 1 deletion chain/sub/incoming_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package sub

import (
"bytes"
"context"
"testing"

address "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-legs/dtsync"
"github.com/filecoin-project/lotus/api/mocks"
"github.com/filecoin-project/lotus/chain/types"
"github.com/golang/mock/gomock"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

type getter struct {
Expand Down Expand Up @@ -61,3 +68,65 @@ func TestFetchCidsWithDedup(t *testing.T) {
t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1])
}
}

func TestIndexerMessageValidator_Validate(t *testing.T) {
validCid, err := cid.Decode("QmbpDgg5kRLDgMxS8vPKNFXEcA6D5MC4CkuUdSWDVtHPGK")
if err != nil {
t.Fatal(err)
}
tests := []struct {
name string
selfPID string
senderPID string
extraData []byte
wantValidation pubsub.ValidationResult
}{
{
name: "invalid extra data is rejected",
selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW",
senderPID: "12D3KooWE8yt84RVwW3sFcd6WMjbUdWrZer2YtT4dmtj3dHdahSZ",
extraData: []byte("f0127896"), // note, casting encoded address to byte is invalid.
wantValidation: pubsub.ValidationReject,
},
{
name: "same sender and receiver is ignored",
selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW",
senderPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW",
wantValidation: pubsub.ValidationIgnore,
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mc := gomock.NewController(t)
node := mocks.NewMockFullNode(mc)
subject := NewIndexerMessageValidator(peer.ID(tc.selfPID), node, node)
message := dtsync.Message{
Cid: validCid,
Addrs: nil,
ExtraData: tc.extraData,
}
buf := bytes.NewBuffer(nil)
if err := message.MarshalCBOR(buf); err != nil {
t.Fatal(err)
}

topic := "topic"
pbm := &pb.Message{
Data: buf.Bytes(),
Topic: &topic,
From: nil,
Seqno: nil,
}
validate := subject.Validate(context.Background(), peer.ID(tc.senderPID), &pubsub.Message{
Message: pbm,
ReceivedFrom: peer.ID(tc.senderPID),
ValidatorData: nil,
})

if validate != tc.wantValidation {
t.Fatalf("expected %v but got %v", tc.wantValidation, validate)
}
})
}
}

0 comments on commit f015ff7

Please sign in to comment.