diff --git a/CHANGELOG.md b/CHANGELOG.md index c6953891fa2..0b753e60122 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ For certain node operators, such as full archival nodes or systems that need to - The miner actor builtin `QAPowerForWeight` no longer accepts the unused "dealWeight" parameter, the function signature now only takes 3 arguments: sectorSize, sectorDuration, verifiedWeight. ([filecoin-project/lotus#12445](https://github.com/filecoin-project/lotus/pull/12445)) - Fix checkpointed tipsets being expanded ([filecoin-project/lotus#12747](https://github.com/filecoin-project/lotus/pull/12747)) +- Remove IPNI advertisement relay over pubsub via Lotus node as it now has been deprecated. ([filecoin-project/lotus#12768](https://github.com/filecoin-project/lotus/pull/12768) ## Bug Fixes diff --git a/build/params_shared_funcs.go b/build/params_shared_funcs.go index 05dbe7817fa..8fe0ee07e36 100644 --- a/build/params_shared_funcs.go +++ b/build/params_shared_funcs.go @@ -14,17 +14,6 @@ import ( func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) } func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) } -func IndexerIngestTopic(netName dtypes.NetworkName) string { - - nn := string(netName) - // The network name testnetnet is here for historical reasons. - // Going forward we aim to use the name `mainnet` where possible. - if nn == "testnetnet" { - nn = "mainnet" - } - - return "/indexer/ingest/" + nn -} func DhtProtocolName(netName dtypes.NetworkName) protocol.ID { return protocol.ID("/fil/kad/" + string(netName)) } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index bc5b09842bd..f21af601eb1 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -1,11 +1,8 @@ package sub import ( - "bytes" "context" - "encoding/binary" "errors" - "sync" "time" lru "github.com/hashicorp/golang-lru/v2" @@ -13,7 +10,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/ipni/go-libipni/announce/message" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" @@ -30,10 +26,8 @@ import ( "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/sub/ratelimit" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" - "github.com/filecoin-project/lotus/node/impl/full" ) var log = logging.Logger("sub") @@ -470,167 +464,3 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType ) stats.Record(ctx, metric.M(1)) } - -type peerMsgInfo struct { - peerID peer.ID - lastCid cid.Cid - lastSeqno uint64 - rateLimit *ratelimit.Window - mutex sync.Mutex -} - -type IndexerMessageValidator struct { - self peer.ID - - peerCache *lru.TwoQueueCache[address.Address, *peerMsgInfo] - chainApi full.ChainModuleAPI - stateApi full.StateModuleAPI -} - -func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator { - peerCache, _ := lru.New2Q[address.Address, *peerMsgInfo](8192) - - return &IndexerMessageValidator{ - self: self, - peerCache: peerCache, - chainApi: chainApi, - stateApi: stateApi, - } -} - -func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - // This chain-node should not be publishing its own messages. These are - // relayed from market-nodes. If a node appears to be local, reject it. - if pid == v.self { - log.Debug("ignoring indexer message from self") - stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationIgnore - } - originPeer := msg.GetFrom() - if originPeer == v.self { - log.Debug("ignoring indexer message originating from self") - stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationIgnore - } - - idxrMsg := message.Message{} - err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data)) - if err != nil { - log.Errorw("Could not decode indexer pubsub message", "err", err) - return pubsub.ValidationReject - } - if len(idxrMsg.ExtraData) == 0 { - log.Debugw("ignoring message missing miner id", "peer", originPeer) - return pubsub.ValidationIgnore - } - - // Get miner info from lotus - minerAddr, err := address.NewFromBytes(idxrMsg.ExtraData) - if err != nil { - log.Warnw("cannot parse extra data as miner address", "err", err, "extraData", idxrMsg.ExtraData) - return pubsub.ValidationReject - } - - msgCid := idxrMsg.Cid - - msgInfo, cached := v.peerCache.Get(minerAddr) - if !cached { - msgInfo = &peerMsgInfo{} - } - - // Lock this peer's message info. - msgInfo.mutex.Lock() - defer msgInfo.mutex.Unlock() - - var seqno uint64 - if cached { - // Reject replayed messages. - seqno = binary.BigEndian.Uint64(msg.Message.GetSeqno()) - if seqno <= msgInfo.lastSeqno { - log.Debugf("ignoring replayed indexer message") - return pubsub.ValidationIgnore - } - } - - if !cached || originPeer != msgInfo.peerID { - // Check that the miner ID maps to the peer that sent the message. - err = v.authenticateMessage(ctx, minerAddr, originPeer) - if err != nil { - log.Warnw("cannot authenticate message", "err", err, "peer", originPeer, "minerID", minerAddr) - stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationReject - } - msgInfo.peerID = originPeer - if !cached { - // Add msgInfo to cache only after being authenticated. If two - // messages from the same peer are handled concurrently, there is a - // small chance that one msgInfo could replace the other here when - // the info is first cached. This is OK, so no need to prevent it. - v.peerCache.Add(minerAddr, msgInfo) - } - } - - // Update message info cache with the latest message's sequence number. - msgInfo.lastSeqno = seqno - - // See if message needs to be ignored due to rate limiting. - if v.rateLimitPeer(msgInfo, msgCid) { - return pubsub.ValidationIgnore - } - - stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1)) - return pubsub.ValidationAccept -} - -func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid.Cid) bool { - const ( - msgLimit = 5 - msgTimeLimit = 10 * time.Second - repeatTimeLimit = 2 * time.Hour - ) - - timeWindow := msgInfo.rateLimit - - // Check overall message rate. - if timeWindow == nil { - timeWindow = ratelimit.NewWindow(msgLimit, msgTimeLimit) - msgInfo.rateLimit = timeWindow - } else if msgInfo.lastCid == msgCid { - // Check if this is a repeat of the previous message data. - if time.Since(timeWindow.Newest()) < repeatTimeLimit { - log.Warnw("ignoring repeated indexer message", "sender", msgInfo.peerID) - return true - } - } - - err := timeWindow.Add() - if err != nil { - log.Warnw("ignoring indexer message", "sender", msgInfo.peerID, "err", err) - return true - } - - msgInfo.lastCid = msgCid - - return false -} - -func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerAddress address.Address, peerID peer.ID) error { - ts, err := v.chainApi.ChainHead(ctx) - if err != nil { - return err - } - - minerInfo, err := v.stateApi.StateMinerInfo(ctx, minerAddress, ts.Key()) - if err != nil { - return err - } - - if minerInfo.PeerId == nil { - return xerrors.New("no peer id for miner") - } - if *minerInfo.PeerId != peerID { - return xerrors.New("miner id does not map to peer that sent message") - } - - return nil -} diff --git a/chain/sub/incoming_test.go b/chain/sub/incoming_test.go index d8ee99b7f12..aeac1e27570 100644 --- a/chain/sub/incoming_test.go +++ b/chain/sub/incoming_test.go @@ -2,23 +2,14 @@ package sub import ( - "bytes" "context" "testing" - "github.com/golang/mock/gomock" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipni/go-libipni/announce/message" - pubsub "github.com/libp2p/go-libp2p-pubsub" - pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/mocks" "github.com/filecoin-project/lotus/chain/types" ) @@ -74,185 +65,3 @@ func TestFetchCidsWithDedup(t *testing.T) { t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1]) } } - -func TestIndexerMessageValidator_Validate(t *testing.T) { - validCid, err := cid.Decode("QmbpDgg5kRLDgMxS8vPKNFXEcA6D5MC4CkuUdSWDVtHPGK") - if err != nil { - t.Fatal(err) - } - tests := []struct { - name string - selfPID string - senderPID string - extraData []byte - wantValidation pubsub.ValidationResult - }{ - { - name: "invalid extra data is rejected", - selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW", - senderPID: "12D3KooWE8yt84RVwW3sFcd6WMjbUdWrZer2YtT4dmtj3dHdahSZ", - extraData: []byte("f0127896"), // note, casting encoded address to byte is invalid. - wantValidation: pubsub.ValidationReject, - }, - { - name: "same sender and receiver is ignored", - selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW", - senderPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW", - wantValidation: pubsub.ValidationIgnore, - }, - } - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - mc := gomock.NewController(t) - node := mocks.NewMockFullNode(mc) - subject := NewIndexerMessageValidator(peer.ID(tc.selfPID), node, node) - message := message.Message{ - Cid: validCid, - Addrs: nil, - ExtraData: tc.extraData, - } - buf := bytes.NewBuffer(nil) - if err := message.MarshalCBOR(buf); err != nil { - t.Fatal(err) - } - - topic := "topic" - pbm := &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: nil, - Seqno: nil, - } - validate := subject.Validate(context.Background(), peer.ID(tc.senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID(tc.senderPID), - ValidatorData: nil, - }) - - if validate != tc.wantValidation { - t.Fatalf("expected %v but got %v", tc.wantValidation, validate) - } - }) - } -} - -func TestIdxValidator(t *testing.T) { - validCid, err := cid.Decode("QmbpDgg5kRLDgMxS8vPKNFXEcA6D5MC4CkuUdSWDVtHPGK") - if err != nil { - t.Fatal(err) - } - - addr, err := address.NewFromString("f01024") - if err != nil { - t.Fatal(err) - } - - buf1, err := addr.MarshalBinary() - if err != nil { - t.Fatal(err) - } - - selfPID := "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW" - senderPID := "12D3KooWE8yt84RVwW3sFcd6WMjbUdWrZer2YtT4dmtj3dHdahSZ" - extraData := buf1 - - mc := gomock.NewController(t) - node := mocks.NewMockFullNode(mc) - node.EXPECT().ChainHead(gomock.Any()).Return(nil, nil).AnyTimes() - - subject := NewIndexerMessageValidator(peer.ID(selfPID), node, node) - message := message.Message{ - Cid: validCid, - Addrs: nil, - ExtraData: extraData, - } - buf := bytes.NewBuffer(nil) - if err := message.MarshalCBOR(buf); err != nil { - t.Fatal(err) - } - - topic := "topic" - - privk, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) - if err != nil { - t.Fatal(err) - } - id, err := peer.IDFromPublicKey(privk.GetPublic()) - if err != nil { - t.Fatal(err) - } - - node.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{PeerId: &id}, nil).AnyTimes() - - pbm := &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: []byte(id), - Seqno: []byte{1, 1, 1, 1, 2, 2, 2, 2}, - } - validate := subject.Validate(context.Background(), peer.ID(senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID("f01024"), // peer.ID(senderPID), - ValidatorData: nil, - }) - if validate != pubsub.ValidationAccept { - t.Error("Expected to receive ValidationAccept") - } - msgInfo, cached := subject.peerCache.Get(addr) - if !cached { - t.Fatal("Message info should be in cache") - } - seqno := msgInfo.lastSeqno - msgInfo.rateLimit = nil // prevent interference from rate limiting - - t.Log("Sending DoS msg") - privk, _, err = crypto.GenerateKeyPair(crypto.RSA, 2048) - if err != nil { - t.Fatal(err) - } - id2, err := peer.IDFromPublicKey(privk.GetPublic()) - if err != nil { - t.Fatal(err) - } - pbm = &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: []byte(id2), - Seqno: []byte{255, 255, 255, 255, 255, 255, 255, 255}, - } - validate = subject.Validate(context.Background(), peer.ID(senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID(senderPID), - ValidatorData: nil, - }) - if validate != pubsub.ValidationReject { - t.Error("Expected to get ValidationReject") - } - msgInfo, cached = subject.peerCache.Get(addr) - if !cached { - t.Fatal("Message info should be in cache") - } - msgInfo.rateLimit = nil // prevent interference from rate limiting - - // Check if DoS is possible. - if msgInfo.lastSeqno != seqno { - t.Fatal("Sequence number should not have been updated") - } - - t.Log("Sending another valid message from miner...") - pbm = &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: []byte(id), - Seqno: []byte{1, 1, 1, 1, 2, 2, 2, 3}, - } - validate = subject.Validate(context.Background(), peer.ID(senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID("f01024"), // peer.ID(senderPID), - ValidatorData: nil, - }) - if validate != pubsub.ValidationAccept { - t.Fatal("Did not receive ValidationAccept") - } -} diff --git a/go.mod b/go.mod index 99831b20ed1..8fe09b33115 100644 --- a/go.mod +++ b/go.mod @@ -99,7 +99,6 @@ require ( github.com/ipld/go-car v0.6.2 github.com/ipld/go-car/v2 v2.13.1 github.com/ipld/go-ipld-prime v0.21.0 - github.com/ipni/go-libipni v0.0.8 github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 github.com/jpillora/backoff v1.0.0 github.com/kelseyhightower/envconfig v1.4.0 diff --git a/go.sum b/go.sum index 2136ad0650a..0d843ff4ceb 100644 --- a/go.sum +++ b/go.sum @@ -701,8 +701,6 @@ github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOan github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:wZ8hH8UxeryOs4kJEJaiui/s00hDSbE37OKsL47g+Sw= -github.com/ipni/go-libipni v0.0.8 h1:0wLfZRSBG84swmZwmaLKul/iB/FlBkkl9ZcR1ub+Z+w= -github.com/ipni/go-libipni v0.0.8/go.mod h1:paYP9U4N3/vOzGCuN9kU972vtvw9JUcQjOKyiCFGwRk= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= diff --git a/node/builder.go b/node/builder.go index 7d03e9593a4..98b40355dfa 100644 --- a/node/builder.go +++ b/node/builder.go @@ -104,6 +104,13 @@ const ( HandleIncomingMessagesKey HandlePaymentChannelManagerKey + // Deprecated: RelayIndexerMessagesKey is no longer used, since IPNI has + // deprecated the use of GossipSub for propagating advertisements. Use IPNI Sync + // protocol instead. + // + // See: + // - https://github.com/ipni/specs/blob/main/IPNI_HTTP_PROVIDER.md + // - https://github.com/ipni/go-libipni/tree/main/dagsync/ipnisync RelayIndexerMessagesKey // miner diff --git a/node/builder_chain.go b/node/builder_chain.go index 72d6f2ee7f1..1293dcd0c76 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -129,8 +129,6 @@ var ChainNode = Options( Override(new(*full.GasPriceCache), full.NewGasPriceCache), - Override(RelayIndexerMessagesKey, modules.RelayIndexerMessages), - // Lite node API ApplyIf(isLiteNode, Override(new(messagepool.Provider), messagepool.NewProviderLite), diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 20a222cd21c..a8560a539ea 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -145,22 +145,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), } - ingestTopicParams := &pubsub.TopicScoreParams{ - // expected ~0.5 confirmed deals / min. sampled - TopicWeight: 0.1, - - TimeInMeshWeight: 0.00027, // ~1/3600 - TimeInMeshQuantum: time.Second, - TimeInMeshCap: 1, - - FirstMessageDeliveriesWeight: 0.5, - FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - FirstMessageDeliveriesCap: 100, // allowing for burstiness - - InvalidMessageDeliveriesWeight: -1000, - InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - } - topicParams := map[string]*pubsub.TopicScoreParams{ build.BlocksTopic(in.Nn): { // expected 10 blocks/min @@ -255,9 +239,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { drandTopics = append(drandTopics, topic) } - // Index ingestion whitelist - topicParams[build.IndexerIngestTopic(in.Nn)] = ingestTopicParams - // IP colocation whitelist var ipcoloWhitelist []*net.IPNet for _, cidr := range in.Cfg.IPColocationWhitelist { @@ -382,7 +363,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { allowTopics := []string{ build.BlocksTopic(in.Nn), build.MessagesTopic(in.Nn), - build.IndexerIngestTopic(in.Nn), } allowTopics = append(allowTopics, drandTopics...) diff --git a/node/modules/services.go b/node/modules/services.go index 37ae325d3ec..267bbe0b81d 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -31,7 +31,6 @@ import ( "github.com/filecoin-project/lotus/journal/fsjournal" "github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/node/hello" - "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" @@ -189,36 +188,6 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } -func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModuleAPI, stateModule full.StateModuleAPI) error { - topicName := build.IndexerIngestTopic(nn) - - v := sub.NewIndexerMessageValidator(h.ID(), chainModule, stateModule) - - if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { - return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err) - } - - topicHandle, err := ps.Join(topicName) - if err != nil { - return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err) - } - cancelFunc, err := topicHandle.Relay() - if err != nil { - return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err) - } - - // Cancel message relay on shutdown. - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - cancelFunc() - return nil - }, - }) - - log.Infof("relaying messages for pubsub topic %s", topicName) - return nil -} - type RandomBeaconParams struct { fx.In