Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

output debug info to track down wantlist leak #331

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions internal/lu/lu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package lu

import (
"fmt"
"os"
"path/filepath"

logging "github.com/ipfs/go-log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var log = logging.Logger("bs:sess")
var sflog = log.Desugar()

var speclog *zap.SugaredLogger

func init() {
zapCfg := zap.NewProductionConfig()
zapCfg.Encoding = "console"
zapCfg.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
zapCfg.Sampling = nil
zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder

if logfp := os.Getenv("BS_SPECIAL_LOG_FILE"); len(logfp) > 0 {
if path, err := filepath.Abs(logfp); err != nil {
fmt.Fprintf(os.Stderr, "failed to resolve log path '%q': %s\n", logfp, err)
} else {
zapCfg.OutputPaths = []string{path}
}
}

cfg := zap.Config(zapCfg)
cfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
newlog, err := cfg.Build()
if err != nil {
panic(err)
}
speclog = newlog.Named("bs:special").Sugar()
}

func GetSpecialLogger() *zap.SugaredLogger {
return speclog
}
22 changes: 22 additions & 0 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
lu "github.com/ipfs/go-bitswap/internal/lu"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
bspm "github.com/ipfs/go-bitswap/internal/peermanager"
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
Expand All @@ -21,6 +22,8 @@ import (
var log = logging.Logger("bs:sess")
var sflog = log.Desugar()

var speclog = lu.GetSpecialLogger()

const (
broadcastLiveWantsLimit = 64
)
Expand Down Expand Up @@ -161,6 +164,8 @@ func New(ctx context.Context,
}
s.sws = newSessionWantSender(id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)

speclog.Warnf("session %d: <create>", id)

go s.run(ctx)

return s
Expand Down Expand Up @@ -250,6 +255,13 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {

// onWantsSent is called when wants are sent to a peer by the session wants sender
func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
for _, wb := range wantBlocks {
speclog.Warnf("session %d: sent want-block %s", s.id, wb)
}
for _, wh := range wantHaves {
speclog.Warnf("session %d: sent want-have %s", s.id, wh)
}

allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks})
}
Expand Down Expand Up @@ -328,12 +340,17 @@ func (s *Session) run(ctx context.Context) {
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
// If this broadcast is because of an idle timeout (we haven't received
// any blocks for a while) then broadcast all pending wants
dbg := ""
if wants == nil {
dbg = " (idle tick)"
wants = s.sw.PrepareBroadcast()
}

// Broadcast a want-have for the live wants to everyone we're connected to
s.wm.BroadcastWantHaves(ctx, s.id, wants)
for _, w := range wants {
speclog.Warnf("session %d: broadcast%s %s", s.id, dbg, w)
}

// do not find providers on consecutive ticks
// -- just rely on periodic search widening
Expand Down Expand Up @@ -365,6 +382,7 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
s.findMorePeers(ctx, randomWant)

s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant})
speclog.Warnf("session %d: broadcast random want %s (periodic search)", s.id, randomWant)

s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
}
Expand Down Expand Up @@ -392,6 +410,7 @@ func (s *Session) handleShutdown() {
s.sws.Shutdown()
// Remove the session from the want manager
s.wm.RemoveSession(s.ctx, s.id)
speclog.Warnf("session %d: <remove>", s.id)
}

// handleReceive is called when the session receives blocks from a peer
Expand Down Expand Up @@ -441,6 +460,9 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks)
for _, w := range ks {
speclog.Warnf("session %d: initial broadcast %s", s.id, w)
}
}
}

Expand Down
11 changes: 11 additions & 0 deletions internal/wantmanager/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
lu "github.com/ipfs/go-bitswap/internal/lu"
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
"github.com/ipfs/go-bitswap/internal/sessionmanager"
bsswl "github.com/ipfs/go-bitswap/internal/sessionwantlist"
Expand Down Expand Up @@ -73,6 +74,10 @@ func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Ci
wm.bcwl.RemoveKeys(blks)
// Send CANCEL to all peers with want-have / want-block
wm.peerHandler.SendCancels(ctx, blks)

for _, k := range blks {
speclog.Warnf("block recvd: send cancel %s", k)
}
}

// BroadcastWantHaves is called when want-haves should be broadcast to all
Expand All @@ -87,6 +92,8 @@ func (wm *WantManager) BroadcastWantHaves(ctx context.Context, ses uint64, wantH
wm.peerHandler.BroadcastWantHaves(ctx, wantHaves)
}

var speclog = lu.GetSpecialLogger()

// RemoveSession is called when the session is shut down
func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) {
// Remove session's interest in the given blocks.
Expand All @@ -101,6 +108,10 @@ func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) {

// Send CANCEL to all peers for blocks that no session is interested in anymore
wm.peerHandler.SendCancels(ctx, cancelKs)

for _, k := range cancelKs {
speclog.Warnf("session %d: (shutdown) send cancel %s", ses, k)
}
}

// Connected is called when a new peer connects
Expand Down