Skip to content

Commit

Permalink
Infer index provider topic from network name by default
Browse files Browse the repository at this point in the history
Index provider integration uses a gossipsub topic to announce changes to
the advertised content. The topic name was fixed to the default topic
which is `/indexer/ingest/mainnet`.

In the case of lotus, the gossipsub validators enforce a list of topics
the instance is permitted to join by setting subscription filter option
when `PubSub` instance is constructed via DI.

Having the fixed topic name meant that any SP starting up a node on a
network other than `mainnet` would have to override the default config
to avoid the node crashing when index provider is enabled.

Instead of a fixed default, the changes here infer the allowed indexer
topic name from network name automatically if the topic configuration is
left empty.

Fixes #8510
  • Loading branch information
masih committed Apr 21, 2022
1 parent 48ffb15 commit 1f7569d
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 15 deletions.
6 changes: 4 additions & 2 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,13 @@
#EntriesChunkSize = 16384

# TopicName sets the topic name on which the changes to the advertised content are announced.
# Defaults to '/indexer/ingest/mainnet' if not specified.
# If not explicitly specified, the topic name is automatically inferred from the network name
# in following format: '/indexer/ingest/<network-name>'
# Defaults to empty, which implies the topic name is inferred from network name.
#
# type: string
# env var: LOTUS_INDEXPROVIDER_TOPICNAME
#TopicName = "/indexer/ingest/mainnet"
#TopicName = ""

# PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine
# starts. By default, the cache is rehydrated from previously cached entries stored in
Expand Down
6 changes: 4 additions & 2 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ func DefaultStorageMiner() *StorageMiner {
Enable: true,
EntriesCacheCapacity: 1024,
EntriesChunkSize: 16384,
TopicName: "/indexer/ingest/mainnet",
PurgeCacheOnStart: false,
// The default empty TopicName means it is inferred from network name, in the following
// format: "/indexer/ingest/<network-name>"
TopicName: "",
PurgeCacheOnStart: false,
},

Subsystems: MinerSubsystemConfig{
Expand Down
4 changes: 2 additions & 2 deletions node/config/def_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) {
require.True(t, reflect.DeepEqual(c, c2))
}

func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) {
func TestDefaultStorageMiner_IsEmpty(t *testing.T) {
subject := DefaultStorageMiner()
require.True(t, subject.IndexProvider.Enable)
require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.TopicName)
require.Equal(t, "", subject.IndexProvider.TopicName)
}
4 changes: 3 additions & 1 deletion node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ type IndexProviderConfig struct {
EntriesChunkSize int

// TopicName sets the topic name on which the changes to the advertised content are announced.
// Defaults to '/indexer/ingest/mainnet' if not specified.
// If not explicitly specified, the topic name is automatically inferred from the network name
// in following format: '/indexer/ingest/<network-name>'
// Defaults to empty, which implies the topic name is inferred from network name.
TopicName string

// PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine
Expand Down
28 changes: 21 additions & 7 deletions node/modules/storageminer_idxprov.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/filecoin-project/go-address"
provider "github.com/filecoin-project/index-provider"
"github.com/filecoin-project/index-provider/engine"
"github.com/filecoin-project/lotus/build"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host"
Expand All @@ -24,33 +25,45 @@ type IdxProv struct {
Datastore dtypes.MetadataDS
}

func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub) (provider.Interface, error) {
func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) {
return func(args IdxProv, marketHost host.Host, dt dtypes.ProviderDataTransfer, maddr dtypes.MinerAddress, ps *pubsub.PubSub, nn dtypes.NetworkName) (provider.Interface, error) {
topicName := cfg.TopicName
// If indexer topic name is left empty, infer it from the network name.
if topicName == "" {
// Use the same mechanism as the Dependency Injection (DI) to construct the topic name,
// so that we are certain it is consistent with the name allowed by the subscription
// filter.
//
// See: lp2p.GossipSub.
topicName = build.IndexerIngestTopic(nn)
log.Debugw("Inferred indexer topic from network name", "topic", topicName)
}

ipds := namespace.Wrap(args.Datastore, datastore.NewKey("/index-provider"))
var opts = []engine.Option{
engine.WithDatastore(ipds),
engine.WithHost(marketHost),
engine.WithRetrievalAddrs(marketHost.Addrs()...),
engine.WithEntriesCacheCapacity(cfg.EntriesCacheCapacity),
engine.WithEntriesChunkSize(cfg.EntriesChunkSize),
engine.WithTopicName(cfg.TopicName),
engine.WithTopicName(topicName),
engine.WithPurgeCacheOnStart(cfg.PurgeCacheOnStart),
}

llog := log.With(
"idxProvEnabled", cfg.Enable,
"pid", marketHost.ID(),
"topic", cfg.TopicName,
"topic", topicName,
"retAddrs", marketHost.Addrs())
// If announcements to the network are enabled, then set options for datatransfer publisher.
if cfg.Enable {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has
// no validators by default.
t, err := ps.Join(cfg.TopicName)
t, err := ps.Join(topicName)
if err != nil {
llog.Errorw("Failed to join indexer topic", "err", err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", cfg.TopicName, err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", topicName, err)
}

// Get the miner ID and set as extra gossip data.
Expand All @@ -62,9 +75,10 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
engine.WithExtraGossipData(ma.Bytes()),
engine.WithTopic(t),
)
llog = llog.With("extraGossipData", ma)
llog = llog.With("extraGossipData", ma, "publisher", "data-transfer")
} else {
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))
llog = llog.With("publisher", "none")
}

// Instantiate the index provider engine.
Expand Down

0 comments on commit 1f7569d

Please sign in to comment.