Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up bootstrapping #225

Merged
merged 7 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ type IpfsDHT struct {
protocols []protocol.ID // DHT protocols
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
_ routing.IpfsRouting = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
)

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
Expand Down
166 changes: 62 additions & 104 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"time"

u "github.com/ipfs/go-ipfs-util"
goprocess "github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/goprocess/periodic"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
routing "github.com/libp2p/go-libp2p-routing"
)

Expand Down Expand Up @@ -39,87 +38,73 @@ var DefaultBootstrapConfig = BootstrapConfig{
Timeout: time.Duration(10 * time.Second),
}

// Bootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
// the default bootstrap config.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig)
if err != nil {
return err
}

// wait till ctx or dht.Context exits.
// we have to do it this way to satisfy the Routing interface (contexts)
go func() {
defer proc.Close()
select {
case <-ctx.Done():
case <-dht.Context().Done():
}
}()

return nil
return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
}

// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// BootstrapWithConfig returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
// Runs cfg.Queries bootstrap queries every cfg.Period.
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
// Because this method is not synchronous, we have to duplicate sanity
// checks on the config so that callers aren't oblivious.
if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}

proc := dht.Process().Go(func(p goprocess.Process) {
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
go func() {
for {
err := dht.runBootstrap(ctx, cfg)
if err != nil {
log.Warningf("error bootstrapping: %s", err)
}
select {
case <-time.After(cfg.Period):
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
case <-p.Closing():
case <-ctx.Done():
return
}
}
})

return proc, nil
}()
return nil
}

// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
// This is a synchronous bootstrap. cfg.Queries queries will run each with a
// timeout of cfg.Timeout. cfg.Period is not used.
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
return dht.runBootstrap(ctx, cfg)
}

if signal == nil {
return nil, fmt.Errorf("invalid signal: %v", signal)
}

proc := periodicproc.Ticker(signal, dht.bootstrapWorker(cfg))

return proc, nil
func newRandomPeerId() peer.ID {
id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
rand.Read(id)
id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it.
return peer.ID(id)
}

func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
return func(worker goprocess.Process) {
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
// Traverse the DHT toward the given ID.
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
return dht.FindPeer(ctx, target)
}

ctx := dht.Context()
if err := dht.runBootstrap(ctx, cfg); err != nil {
log.Warning(err)
// A bootstrapping error is important to notice but not fatal.
}
// Traverse the DHT toward a random ID.
func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
id := newRandomPeerId()
p, err := dht.walk(ctx, id)
switch err {
case routing.ErrNotFound:
return nil
case nil:
// We found a peer from a randomly generated ID. This should be very
// unlikely.
log.Warningf("random walk toward %s actually found peer: %s", id, p)
return nil
default:
return err
}
}

Expand All @@ -132,51 +117,24 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
defer bslog("end")
defer log.EventBegin(ctx, "dhtRunBootstrap").Done()

var merr u.MultiErr

randomID := func() peer.ID {
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id := make([]byte, 16)
rand.Read(id)
id = u.Hash(id)
return peer.ID(id)
}

// bootstrap sequentially, as results will compound
runQuery := func(ctx context.Context, id peer.ID) {
doQuery := func(n int, target string, f func(context.Context) error) error {
log.Debugf("Bootstrapping query (%d/%d) to %s", n, cfg.Queries, target)
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
defer cancel()

p, err := dht.FindPeer(ctx, id)
if err == routing.ErrNotFound {
// this isn't an error. this is precisely what we expect.
} else if err != nil {
merr = append(merr, err)
} else {
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
log.Warningf("%s", err)
merr = append(merr, err)
}
return f(ctx)
}

// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < cfg.Queries; i++ {
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
runQuery(ctx, id)
// Do all but one of the bootstrap queries as random walks.
for i := 1; i < cfg.Queries; i++ {
err := doQuery(i, "random ID", dht.randomWalk)
if err != nil {
return err
}
}

// Find self to distribute peer info to our neighbors.
// Do this after bootstrapping.
log.Debugf("Bootstrapping query to self: %s", dht.self)
runQuery(ctx, dht.self)

if len(merr) > 0 {
return merr
}
return nil
return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), func(ctx context.Context) error {
_, err := dht.walk(ctx, dht.self)
return err
})
}
18 changes: 2 additions & 16 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,23 +679,10 @@ func TestPeriodicBootstrap(t *testing.T) {
}
}()

signals := []chan time.Time{}

var cfg BootstrapConfig
cfg = DefaultBootstrapConfig
cfg.Queries = 5

// kick off periodic bootstrappers with instrumented signals.
for _, dht := range dhts {
s := make(chan time.Time)
signals = append(signals, s)
proc, err := dht.BootstrapOnSignal(cfg, s)
if err != nil {
t.Fatal(err)
}
defer proc.Close()
}

t.Logf("dhts are not connected. %d", nDHTs)
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
Expand All @@ -721,9 +708,8 @@ func TestPeriodicBootstrap(t *testing.T) {
}

t.Logf("bootstrapping them so they find each other. %d", nDHTs)
now := time.Now()
for _, signal := range signals {
go func(s chan time.Time) { s <- now }(signal)
for _, dht := range dhts {
hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
go dht.BootstrapOnce(ctx, cfg)
}

// this is async, and we dont know when it's finished with one cycle, so keep checking
Expand Down
3 changes: 2 additions & 1 deletion routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
routing "github.com/libp2p/go-libp2p-routing"
notif "github.com/libp2p/go-libp2p-routing/notifications"
ropts "github.com/libp2p/go-libp2p-routing/options"
"github.com/pkg/errors"
anacrolix marked this conversation as resolved.
Show resolved Hide resolved
)

// asyncQueryBuffer is the size of buffered channels in async queries. This
Expand Down Expand Up @@ -583,7 +584,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo

peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return pstore.PeerInfo{}, kb.ErrLookupFailure
return pstore.PeerInfo{}, errors.WithStack(kb.ErrLookupFailure)
}

// Sanity...
Expand Down