Skip to content

Commit

Permalink
Merge branch 'main' into fix/mfs-cache-3
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan authored Dec 17, 2024
2 parents fbe0229 + a83de68 commit cbe230a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 29 deletions.
10 changes: 5 additions & 5 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PeerManager struct {
createPeerQueue PeerQueueFactory
ctx context.Context

psLk sync.Mutex
psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

Expand Down Expand Up @@ -121,9 +121,9 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (pm *PeerManager) ResponseReceived(p peer.ID, ks []cid.Cid) {
pm.pqLk.Lock()
pm.pqLk.RLock()
pq, ok := pm.peerQueues[p]
pm.pqLk.Unlock()
pm.pqLk.RUnlock()

if ok {
pq.ResponseReceived(ks)
Expand Down Expand Up @@ -233,8 +233,8 @@ func (pm *PeerManager) UnregisterSession(ses uint64) {
// signalAvailability is called when a peer's connectivity changes.
// It informs interested sessions.
func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) {
pm.psLk.Lock()
defer pm.psLk.Unlock()
pm.psLk.RLock()
defer pm.psLk.RUnlock()

sesIds, ok := pm.peerSessions[p]
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions bitswap/client/internal/session/peerresponsetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ func (prt *peerResponseTracker) getPeerCount(p peer.ID) int {
// will be chosen
return prt.firstResponder[p] + 1
}

func (prt *peerResponseTracker) remove(p peer.ID) {
delete(prt.firstResponder, p)
}
12 changes: 4 additions & 8 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,7 @@ func (sws *sessionWantSender) addChangeNonBlocking(c change) {
case sws.changes <- c:
default:
// changes channel is full, so add change in a go routine instead
go func() {
select {
case sws.changes <- c:
case <-sws.ctx.Done():
}
}()
go sws.addChange(c)
}
}

Expand Down Expand Up @@ -326,14 +321,15 @@ func (sws *sessionWantSender) processAvailability(availability map[peer.ID]bool)
if wasAvailable {
stateChange = true
newlyUnavailable = append(newlyUnavailable, p)
// Remove count of first responses from peer.
sws.peerRspTrkr.remove(p)
}
}

// If the state has changed
if stateChange {
sws.updateWantsPeerAvailability(p, isNowAvailable)
// Reset the count of consecutive DONT_HAVEs received from the
// peer
// Reset count of consecutive DONT_HAVEs received from the peer.
delete(sws.peerConsecutiveDontHaves, p)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (sim *SessionInterestManager) RemoveSession(ses uint64) []cid.Cid {
defer sim.lk.Unlock()

// The keys that no session is interested in
deletedKs := make([]cid.Cid, 0)
var deletedKs []cid.Cid

// For each known key
for c := range sim.wants {
Expand Down Expand Up @@ -119,18 +119,19 @@ func (sim *SessionInterestManager) RemoveSessionInterested(ses uint64, ks []cid.
// The session calls FilterSessionInterested() to filter the sets of keys for
// those that the session is interested in
func (sim *SessionInterestManager) FilterSessionInterested(ses uint64, ksets ...[]cid.Cid) [][]cid.Cid {
kres := make([][]cid.Cid, len(ksets))

sim.lk.RLock()
defer sim.lk.RUnlock()

// For each set of keys
kres := make([][]cid.Cid, len(ksets))
for i, ks := range ksets {
// The set of keys that at least one session is interested in
has := make([]cid.Cid, 0, len(ks))
var has []cid.Cid

// For each key in the list
for _, c := range ks {
// If there is a session that's interested, add the key to the set
// If the session is interested, add the key to the set
if _, ok := sim.wants[c][ses]; ok {
has = append(has, c)
}
Expand All @@ -144,7 +145,6 @@ func (sim *SessionInterestManager) FilterSessionInterested(ses uint64, ksets ...
// unwanted blocks
func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]blocks.Block, []blocks.Block) {
sim.lk.RLock()
defer sim.lk.RUnlock()

// Get the wanted block keys as a set
wantedKs := cid.NewSet()
Expand All @@ -160,6 +160,8 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b
}
}

sim.lk.RUnlock()

// Separate the blocks into wanted and unwanted
wantedBlks := make([]blocks.Block, 0, len(blks))
notWantedBlks := make([]blocks.Block, 0)
Expand All @@ -175,23 +177,22 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b

// When the SessionManager receives a message it calls InterestedSessions() to
// find out which sessions are interested in the message.
func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []uint64 {
sim.lk.RLock()
defer sim.lk.RUnlock()
func (sim *SessionInterestManager) InterestedSessions(keySets ...[]cid.Cid) []uint64 {
sesSet := make(map[uint64]struct{})

ks := make([]cid.Cid, 0, len(blks)+len(haves)+len(dontHaves))
ks = append(ks, blks...)
ks = append(ks, haves...)
ks = append(ks, dontHaves...)
sim.lk.RLock()

// Create a set of sessions that are interested in the keys
sesSet := make(map[uint64]struct{})
for _, c := range ks {
for s := range sim.wants[c] {
sesSet[s] = struct{}{}
for _, keySet := range keySets {
for _, c := range keySet {
for s := range sim.wants[c] {
sesSet[s] = struct{}{}
}
}
}

sim.lk.RUnlock()

// Convert the set into a list
ses := make([]uint64, 0, len(sesSet))
for s := range sesSet {
Expand Down

0 comments on commit cbe230a

Please sign in to comment.