Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor: move things around in session / wants
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Aug 16, 2019
1 parent ea46d80 commit b5a0a23
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 59 deletions.
73 changes: 35 additions & 38 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *Session) IsWanted(c cid.Cid) bool {

// InterestedIn returns true if this session has ever requested the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
return s.sw.IsInterested(c)
return s.sw.InterestedIn(c)
}

// GetBlock fetches a single block.
Expand Down Expand Up @@ -196,12 +196,6 @@ func (s *Session) run(ctx context.Context) {
for {
select {
case rcv := <-s.incoming:
s.cancelIncoming(ctx, rcv)
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
if rcv.from != "" {
s.updateReceiveCounters(ctx, rcv)
}
s.handleIncoming(ctx, rcv)
case keys := <-s.newReqs:
s.wantBlocks(ctx, keys)
Expand All @@ -222,27 +216,6 @@ func (s *Session) run(ctx context.Context) {
}
}

func (s *Session) cancelIncoming(ctx context.Context, rcv rcvFrom) {
// We've received the blocks so we can cancel any outstanding wants for them
wanted := make([]cid.Cid, 0, len(rcv.ks))
for _, k := range rcv.ks {
if s.sw.IsWanted(b.Cid()) {
wanted = append(wanted, k)
}
}
s.pm.RecordCancels(wanted)
s.wm.CancelWants(s.ctx, wanted, nil, s.id)
}

func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
s.idleTick.Stop()

// Process the received blocks
s.processIncoming(ctx, rcv.ks)

s.resetIdleTick()
}

func (s *Session) handleIdleTick(ctx context.Context) {
live := s.sw.PrepareBroadcast()

Expand Down Expand Up @@ -283,21 +256,28 @@ func (s *Session) handleShutdown() {
s.wm.CancelWants(s.ctx, live, nil, s.id)
}

func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid) {
wanted, totalLatency := s.sw.BlocksReceived(ks)
if wanted.Len() == 0 {
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
if rcv.from != "" {
s.updateReceiveCounters(ctx, rcv)
}

// Update the want list
wanted, totalLatency := s.sw.BlocksReceived(rcv.ks)
if len(wanted) == 0 {
return
}

// Keep track of the total number of blocks received and total latency
s.fetchcnt += wanted.Len()
s.latTotal += totalLatency
// We've received the blocks so we can cancel any outstanding wants for them
s.cancelIncoming(ctx, wanted)

// We've received new wanted blocks, so reset the number of ticks
// that have occurred since the last new block
s.consecutiveTicks = 0
s.idleTick.Stop()

s.wantBlocks(ctx, nil)
// Process the received blocks
s.processIncoming(ctx, wanted, totalLatency)

s.resetIdleTick()
}

func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
Expand All @@ -315,6 +295,23 @@ func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
}
}

func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
s.pm.RecordCancels(ks)
s.wm.CancelWants(s.ctx, ks, nil, s.id)
}

func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) {
// Keep track of the total number of blocks received and total latency
s.fetchcnt += len(ks)
s.latTotal += totalLatency

// We've received new wanted blocks, so reset the number of ticks
// that have occurred since the last new block
s.consecutiveTicks = 0

s.wantBlocks(ctx, nil)
}

func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
ks := s.sw.GetNextWants(s.wantLimit(), newks)
if len(ks) == 0 {
Expand Down
27 changes: 6 additions & 21 deletions session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (sw *sessionWants) Stats() string {
return fmt.Sprintf("%d past / %d pending / %d live", sw.pastWants.Len(), sw.toFetch.Len(), len(sw.liveWants))
}

func (sw *sessionWants) BlocksReceived(cids []cid.Cid) (*cid.Set, time.Duration) {
func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) {
sw.liveWantsLk.Lock()
defer sw.liveWantsLk.Unlock()
sw.toFetchLk.Lock()
Expand All @@ -42,9 +42,11 @@ func (sw *sessionWants) BlocksReceived(cids []cid.Cid) (*cid.Set, time.Duration)
defer sw.pastWantsLk.Unlock()

totalLatency := time.Duration(0)
wanted := sw.unlockedGetWanted(cids)
wanted := make([]cid.Cid, 0, len(cids))
for _, c := range cids {
if wanted.Has(c) {
if sw.unlockedIsWanted(c) {
wanted = append(wanted, c)

// If the block CID was in the live wants queue, remove it
tval, ok := sw.liveWants[c]
if ok {
Expand All @@ -63,16 +65,6 @@ func (sw *sessionWants) BlocksReceived(cids []cid.Cid) (*cid.Set, time.Duration)
return wanted, totalLatency
}

func (sw *sessionWants) unlockedGetWanted(cids []cid.Cid) *cid.Set {
cset := cid.NewSet()
for _, c := range cids {
if sw.unlockedIsWanted(c) {
cset.Add(c)
}
}
return cset
}

func (sw *sessionWants) GetNextWants(limit int, newWants []cid.Cid) []cid.Cid {
sw.liveWantsLk.Lock()
defer sw.liveWantsLk.Unlock()
Expand Down Expand Up @@ -118,7 +110,7 @@ func (sw *sessionWants) SplitIsWasWanted(cids []cid.Cid) ([]cid.Cid, []cid.Cid)
return isWanted, wasWanted
}

func (sw *sessionWants) IsInterested(c cid.Cid) bool {
func (sw *sessionWants) InterestedIn(c cid.Cid) bool {
sw.liveWantsLk.RLock()
defer sw.liveWantsLk.RUnlock()
sw.toFetchLk.RLock()
Expand Down Expand Up @@ -146,13 +138,6 @@ func (sw *sessionWants) unlockedIsWanted(c cid.Cid) bool {
return ok
}

func (sw *sessionWants) WasWanted(c cid.Cid) bool {
sw.pastWantsLk.RLock()
defer sw.pastWantsLk.RUnlock()

return sw.pastWants.Has(c)
}

func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
sw.liveWantsLk.Lock()
defer sw.liveWantsLk.Unlock()
Expand Down

0 comments on commit b5a0a23

Please sign in to comment.