Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peerstore: add contexts to all methods #2312

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"crypto/rand"
"errors"
"fmt"
Expand Down Expand Up @@ -150,10 +151,10 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return nil, err
}

if err := cfg.Peerstore.AddPrivKey(pid, cfg.PeerKey); err != nil {
if err := cfg.Peerstore.AddPrivKey(context.Background(), pid, cfg.PeerKey); err != nil {
return nil, err
}
if err := cfg.Peerstore.AddPubKey(pid, cfg.PeerKey.GetPublic()); err != nil {
if err := cfg.Peerstore.AddPubKey(context.Background(), pid, cfg.PeerKey.GetPublic()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -194,7 +195,7 @@ func (cfg *Config) addTransports(h host.Host) error {
fx.Supply(cfg.Muxers),
fx.Supply(h.ID()),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(context.Background(), h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
Expand Down
6 changes: 4 additions & 2 deletions core/peerstore/helpers.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package peerstore

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"
)

// AddrInfos returns an AddrInfo for each specified peer ID, in-order.
func AddrInfos(ps Peerstore, peers []peer.ID) []peer.AddrInfo {
func AddrInfos(ctx context.Context, ps Peerstore, peers []peer.ID) []peer.AddrInfo {
pi := make([]peer.AddrInfo, len(peers))
for i, p := range peers {
pi[i] = ps.PeerInfo(p)
pi[i] = ps.PeerInfo(ctx, p)
}
return pi
}
58 changes: 29 additions & 29 deletions core/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ type Peerstore interface {
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(peer.ID) peer.AddrInfo
PeerInfo(context.Context, peer.ID) peer.AddrInfo

// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
Peers(context.Context) peer.IDSlice
}

// PeerMetadata can handle values of any type. Serializing values is
Expand All @@ -77,47 +77,47 @@ type PeerMetadata interface {
// Get / Put is a simple registry for other peer-related key/value pairs.
// If we find something we use often, it should become its own set of
// methods. This is a last resort.
Get(p peer.ID, key string) (interface{}, error)
Put(p peer.ID, key string, val interface{}) error
Get(ctx context.Context, p peer.ID, key string) (interface{}, error)
Put(ctx context.Context, p peer.ID, key string, val interface{}) error

// RemovePeer removes all values stored for a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}

// AddrBook holds the multiaddrs of peers.
type AddrBook interface {
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
AddAddr(context.Context, peer.ID, ma.Multiaddr, time.Duration)

// AddAddrs gives this AddrBook addresses to use, with a given ttl
// (time-to-live), after which the address is no longer valid.
// If the manager has a longer TTL, the operation is a no-op for that address
AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
AddAddrs(context.Context, peer.ID, []ma.Multiaddr, time.Duration)

// SetAddr calls mgr.SetAddrs(p, addr, ttl)
SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
SetAddr(context.Context, peer.ID, ma.Multiaddr, time.Duration)

// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
SetAddrs(context.Context, peer.ID, []ma.Multiaddr, time.Duration)

// UpdateAddrs updates the addresses associated with the given peer that have
// the given oldTTL to have the given newTTL.
UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration)
UpdateAddrs(context.Context, peer.ID, time.Duration, time.Duration)

// Addrs returns all known (and valid) addresses for a given peer.
Addrs(p peer.ID) []ma.Multiaddr
Addrs(context.Context, peer.ID) []ma.Multiaddr

// AddrStream returns a channel that gets all addresses for a given
// peer sent on it. If new addresses are added after the call is made
// they will be sent along through the channel as well.
AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr

// ClearAddresses removes all previously stored addresses.
ClearAddrs(p peer.ID)
ClearAddrs(context.Context, peer.ID)

// PeersWithAddrs returns all of the peer IDs stored in the AddrBook.
PeersWithAddrs() peer.IDSlice
PeersWithAddrs(context.Context) peer.IDSlice
}

// CertifiedAddrBook manages "self-certified" addresses for remote peers.
Expand Down Expand Up @@ -172,12 +172,12 @@ type CertifiedAddrBook interface {
// AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used
// to update the TTL of certified addresses that have previously been
// added via ConsumePeerRecord.
ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error)
ConsumePeerRecord(context.Context, *record.Envelope, time.Duration) (accepted bool, err error)

// GetPeerRecord returns a Envelope containing a PeerRecord for the
// given peer id, if one exists.
// Returns nil if no signed PeerRecord exists for the peer.
GetPeerRecord(p peer.ID) *record.Envelope
GetPeerRecord(context.Context, peer.ID) *record.Envelope
}

// GetCertifiedAddrBook is a helper to "upcast" an AddrBook to a
Expand All @@ -196,24 +196,24 @@ func GetCertifiedAddrBook(ab AddrBook) (cab CertifiedAddrBook, ok bool) {
// KeyBook tracks the keys of Peers.
type KeyBook interface {
// PubKey stores the public key of a peer.
PubKey(peer.ID) ic.PubKey
PubKey(context.Context, peer.ID) ic.PubKey

// AddPubKey stores the public key of a peer.
AddPubKey(peer.ID, ic.PubKey) error
AddPubKey(context.Context, peer.ID, ic.PubKey) error

// PrivKey returns the private key of a peer, if known. Generally this might only be our own
// private key, see
// https://discuss.libp2p.io/t/what-is-the-purpose-of-having-map-peer-id-privatekey-in-peerstore/74.
PrivKey(peer.ID) ic.PrivKey
PrivKey(context.Context, peer.ID) ic.PrivKey

// AddPrivKey stores the private key of a peer.
AddPrivKey(peer.ID, ic.PrivKey) error
AddPrivKey(context.Context, peer.ID, ic.PrivKey) error

// PeersWithKeys returns all the peer IDs stored in the KeyBook.
PeersWithKeys() peer.IDSlice
PeersWithKeys(context.Context) peer.IDSlice

// RemovePeer removes all keys associated with a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}

// Metrics tracks metrics across a set of peers.
Expand All @@ -226,25 +226,25 @@ type Metrics interface {
LatencyEWMA(peer.ID) time.Duration

// RemovePeer removes all metrics stored for a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}

// ProtoBook tracks the protocols supported by peers.
type ProtoBook interface {
GetProtocols(peer.ID) ([]protocol.ID, error)
AddProtocols(peer.ID, ...protocol.ID) error
SetProtocols(peer.ID, ...protocol.ID) error
RemoveProtocols(peer.ID, ...protocol.ID) error
GetProtocols(context.Context, peer.ID) ([]protocol.ID, error)
AddProtocols(context.Context, peer.ID, ...protocol.ID) error
SetProtocols(context.Context, peer.ID, ...protocol.ID) error
RemoveProtocols(context.Context, peer.ID, ...protocol.ID) error

// SupportsProtocols returns the set of protocols the peer supports from among the given protocols.
// If the returned error is not nil, the result is indeterminate.
SupportsProtocols(peer.ID, ...protocol.ID) ([]protocol.ID, error)
SupportsProtocols(context.Context, peer.ID, ...protocol.ID) ([]protocol.ID, error)

// FirstSupportedProtocol returns the first protocol that the peer supports among the given protocols.
// If the peer does not support any of the given protocols, this function will return an empty protocol.ID and a nil error.
// If the returned error is not nil, the result is indeterminate.
FirstSupportedProtocol(peer.ID, ...protocol.ID) (protocol.ID, error)
FirstSupportedProtocol(context.Context, peer.ID, ...protocol.ID) (protocol.ID, error)

// RemovePeer removes all protocols associated with a peer.
RemovePeer(peer.ID)
RemovePeer(context.Context, peer.ID)
}
2 changes: 1 addition & 1 deletion p2p/discovery/backoff/backoffconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func getNetHosts(t *testing.T, n int) []host.Host {
func loadCh(peers []host.Host) <-chan peer.AddrInfo {
ch := make(chan peer.AddrInfo, len(peers))
for _, p := range peers {
ch <- p.Peerstore().PeerInfo(p.ID())
ch <- p.Peerstore().PeerInfo(context.Background(), p.ID())
}
close(ch)
return ch
Expand Down
8 changes: 4 additions & 4 deletions p2p/host/autonat/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (as *AmbientAutoNAT) background() {
as.confidence--
}
case event.EvtPeerIdentificationCompleted:
if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
if s, err := as.host.Peerstore().SupportsProtocols(context.Background(), e.Peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := *as.status.Load()
if currentStatus == network.ReachabilityUnknown {
as.tryProbe(e.Peer)
Expand Down Expand Up @@ -367,7 +367,7 @@ func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool {
}
as.cleanupRecentProbes()

info := as.host.Peerstore().PeerInfo(p)
info := as.host.Peerstore().PeerInfo(context.Background(), p)

if !as.config.dialPolicy.skipPeer(info.Addrs) {
as.recentProbes[p] = time.Now()
Expand Down Expand Up @@ -402,9 +402,9 @@ func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
candidates := make([]peer.ID, 0, len(peers))

for _, p := range peers {
info := as.host.Peerstore().PeerInfo(p)
info := as.host.Peerstore().PeerInfo(context.Background(), p)
// Exclude peers which don't support the autonat protocol.
if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil {
if proto, err := as.host.Peerstore().SupportsProtocols(context.Background(), p, AutoNATProto); len(proto) == 0 || err != nil {
continue
}

Expand Down
8 changes: 4 additions & 4 deletions p2p/host/autonat/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ func makeAutoNATServicePublic(t *testing.T) host.Host {

func makeAutoNAT(t *testing.T, ash host.Host) (host.Host, AutoNAT) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute)
h.Peerstore().AddProtocols(ash.ID(), AutoNATProto)
h.Peerstore().AddAddrs(context.Background(), ash.ID(), ash.Addrs(), time.Minute)
h.Peerstore().AddProtocols(context.Background(), ash.ID(), AutoNATProto)
a, _ := New(h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay())
a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond
return h, a
}

func identifyAsServer(server, recip host.Host) {
recip.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Minute)
recip.Peerstore().AddProtocols(server.ID(), AutoNATProto)
recip.Peerstore().AddAddrs(context.Background(), server.ID(), server.Addrs(), time.Minute)
recip.Peerstore().AddProtocols(context.Background(), server.ID(), AutoNATProto)

}

Expand Down
8 changes: 4 additions & 4 deletions p2p/host/autonat/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
ctx, cancel := context.WithTimeout(context.Background(), as.config.dialTimeout)
defer cancel()

as.config.dialer.Peerstore().ClearAddrs(pi.ID)
as.config.dialer.Peerstore().ClearAddrs(ctx, pi.ID)

as.config.dialer.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
as.config.dialer.Peerstore().AddAddrs(ctx, pi.ID, pi.Addrs, peerstore.TempAddrTTL)

defer func() {
as.config.dialer.Peerstore().ClearAddrs(pi.ID)
as.config.dialer.Peerstore().RemovePeer(pi.ID)
as.config.dialer.Peerstore().ClearAddrs(ctx, pi.ID)
as.config.dialer.Peerstore().RemovePeer(ctx, pi.ID)
}()

conn, err := as.config.dialer.DialPeer(ctx, pi.ID)
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/autonat/test/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestAutonatRoundtrip(t *testing.T) {
t.Fatal(err)
}

client.Peerstore().AddAddrs(service.ID(), service.Addrs(), time.Hour)
require.NoError(t, client.Connect(context.Background(), service.Peerstore().PeerInfo(service.ID())))
client.Peerstore().AddAddrs(context.Background(), service.ID(), service.Addrs(), time.Hour)
require.NoError(t, client.Connect(context.Background(), service.Peerstore().PeerInfo(context.Background(), service.ID())))

cSub, err := client.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR
return false, ctx.Err()
}

protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv2)
protos, err := rf.host.Peerstore().SupportsProtocols(ctx, pi.ID, protoIDv2)
if err != nil {
return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", pi.ID, err)
}
Expand Down Expand Up @@ -734,7 +734,7 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
// add relay specific addrs to the list
relayAddrCnt := 0
for p := range rf.relays {
addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(p))
addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(context.Background(), p))
relayAddrCnt += len(addrs)
circuit := ma.StringCast(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty()))
for _, addr := range addrs {
Expand Down
16 changes: 8 additions & 8 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
}
h.caBook = cab

h.signKey = h.Peerstore().PrivKey(h.ID())
h.signKey = h.Peerstore().PrivKey(context.Background(), h.ID())
if h.signKey == nil {
return nil, errors.New("unable to access host key")
}
Expand All @@ -221,7 +221,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if err != nil {
return nil, fmt.Errorf("failed to create signed record for self: %w", err)
}
if _, err := cab.ConsumePeerRecord(ev, peerstore.PermanentAddrTTL); err != nil {
if _, err := cab.ConsumePeerRecord(context.Background(), ev, peerstore.PermanentAddrTTL); err != nil {
return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err)
}
}
Expand Down Expand Up @@ -515,7 +515,7 @@ func (h *BasicHost) background() {
changeEvt.SignedPeerRecord = sr

// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
if _, err := h.caBook.ConsumePeerRecord(context.Background(), sr, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}
Expand Down Expand Up @@ -656,7 +656,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
return nil, ctx.Err()
}

pref, err := h.preferredProtocol(p, pids)
pref, err := h.preferredProtocol(ctx, p, pids)
if err != nil {
_ = s.Reset()
return nil, err
Expand Down Expand Up @@ -692,12 +692,12 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
}

s.SetProtocol(selected)
h.Peerstore().AddProtocols(p, selected)
h.Peerstore().AddProtocols(ctx, p, selected)
return s, nil
}

func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.ID, error) {
supported, err := h.Peerstore().SupportsProtocols(p, pids...)
func (h *BasicHost) preferredProtocol(ctx context.Context, p peer.ID, pids []protocol.ID) (protocol.ID, error) {
supported, err := h.Peerstore().SupportsProtocols(ctx, p, pids...)
if err != nil {
return "", err
}
Expand All @@ -716,7 +716,7 @@ func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.I
// It will also resolve any /dns4, /dns6, and /dnsaddr addresses.
func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
// absorb addresses into peerstore
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
h.Peerstore().AddAddrs(ctx, pi.ID, pi.Addrs, peerstore.TempAddrTTL)

forceDirect, _ := network.GetForceDirectDial(ctx)
if !forceDirect {
Expand Down
Loading