Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: market: Reuse the market PubSub in index provider #8443

Merged
merged 2 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func ConfigStorageMiner(c interface{}) Option {
enableLibp2pNode := cfg.Subsystems.EnableMarkets // we enable libp2p nodes if the storage market subsystem is enabled, otherwise we don't

return Options(

// Needed to instantiate pubsub used by index provider via ConfigCommon
Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig),
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap),
ConfigCommon(&cfg.Common, enableLibp2pNode),

Override(CheckFDLimit, modules.CheckFdLimit(build.MinerFDLimit)), // recommend at least 100k FD limit to miners
Expand Down
24 changes: 20 additions & 4 deletions node/modules/storageminer_idxprov.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"golang.org/x/xerrors"

Expand All @@ -23,8 +24,8 @@ type IdxProv struct {
Datastore dtypes.MetadataDS
}

func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress) (provider.Interface, error) {
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
var opts = []engine.Option{
engine.WithDatastore(ipds),
Expand All @@ -36,16 +37,31 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart),
}

llog := log.With("idxProvEnabled", cfg.Enable, "pid", marketHost.ID(), "retAddrs", marketHost.Addrs())
llog := log.With(
"idxProvEnabled", cfg.Enable,
"pid", marketHost.ID(),
"topic", cfg.TopicName,
"retAddrs", marketHost.Addrs())
// If announcements to the network are enabled, then set options for datatransfer publisher.
if cfg.Enable {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has
// no validators by default.
t, err := ps.Join(cfg.TopicName)
Comment on lines +47 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should the market process be subscribed to the topic? it shouldn't be receiving or interested in any messages on this topic - it only produced the messages and we don't want it to relay or receive them.

Copy link
Member Author

@masih masih Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To publish we also need a topic instance; at least go-legs publisher needs it. Is there a way to publish to a topic without joining it?

Also, the changes here are equivelant to what legs publisher does internally if topic is not set.

Subscribing to a topic (i.e. t.Subscribe) is another explicit call. .Join does not mean subscription I don't think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you can publish without subscribing.

Copy link
Contributor

@vyzo vyzo Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we normally do it by calling publish directly, without a topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and thats totally ok, getting the topic to publish is the caconical way since topic handles were added.

if err != nil {
llog.Errorw("Failed to join indexer topic", "err", err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err)
}

// Get the miner ID and set as extra gossip data.
// The extra data is required by the lotus-specific index-provider gossip message validators.
ma := address.Address(maddr)
opts = append(opts,
engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithDataTransfer(dt),
engine.WithExtraGossipData(ma.Bytes()))
engine.WithExtraGossipData(ma.Bytes()),
engine.WithTopic(t),
)
llog = llog.With("extraGossipData", ma)
} else {
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))
Expand Down