Skip to content

Commit

Permalink
Reuse the market process PubSub instance in index provider engine
Browse files Browse the repository at this point in the history
The markets process instantiates its own `PubSub` instance with all
validators, peer scoring, etc. set up. Use that instane to join the
indexing topic, otherwise the default topic instantiated by
index-provider internally (via go-legs) has no validators.
  • Loading branch information
masih committed Apr 6, 2022
1 parent 8a16e1c commit cc2f805
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions node/modules/storageminer_idxprov.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"golang.org/x/xerrors"

Expand All @@ -23,25 +24,40 @@ type IdxProv struct {
Datastore dtypes.MetadataDS
}

func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
var opts = []engine.Option{
engine.WithDatastore(ipds),
engine.WithHost(marketHost),
engine.WithRetrievalAddrs(marketHost.Addrs()...),
}

llog := log.With("idxProvEnabled", cfg.Enable, "pid", marketHost.ID(), "retAddrs", marketHost.Addrs())
llog := log.With(
"idxProvEnabled", cfg.Enable,
"pid", marketHost.ID(),
"topic", cfg.TopicName,
"retAddrs", marketHost.Addrs())
// If announcements to the network are enabled, then set options for datatransfer publisher.
if cfg.Enable {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has
// no validators by default.
t, err := ps.Join(cfg.TopicName)
if err != nil {
llog.Errorw("Failed to join indexer topic", "err", err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err)
}

// Get the miner ID and set as extra gossip data.
// The extra data is required by the lotus-specific index-provider gossip message validators.
ma := address.Address(maddr)
opts = append(opts,
engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithDataTransfer(dt),
engine.WithExtraGossipData(ma.Bytes()))
engine.WithExtraGossipData(ma.Bytes()),
engine.WithTopic(t),
)
llog = llog.With("extraGossipData", ma)
} else {
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))
Expand Down

0 comments on commit cc2f805

Please sign in to comment.