diff --git a/bitswap.go b/bitswap.go index 03694302..72a80908 100644 --- a/bitswap.go +++ b/bitswap.go @@ -7,10 +7,10 @@ import ( "errors" "fmt" - "sync" "time" delay "github.com/ipfs/go-ipfs-delay" + "github.com/sasha-s/go-deadlock" deciface "github.com/ipfs/go-bitswap/decision" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" @@ -58,6 +58,12 @@ var ( timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600} ) +func init() { + deadlock.Opts.OnPotentialDeadlock = func() {} + deadlock.Opts.DeadlockTimeout = 1 * time.Minute + deadlock.Opts.MaxMapSize = 1024 * 640 +} + // Option defines the functional option type that can be used to configure // bitswap instances type Option func(*Bitswap) @@ -325,7 +331,7 @@ type Bitswap struct { process process.Process // Counters for various statistics - counterLk sync.Mutex + counterLk deadlock.Mutex counters *counters // Metrics interface metrics @@ -585,7 +591,7 @@ func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) { func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool { res := make([]bool, len(blks)) - wg := sync.WaitGroup{} + wg := deadlock.WaitGroup{} for i, block := range blks { wg.Add(1) go func(i int, b blocks.Block) { diff --git a/go.mod b/go.mod index 91467c66..f73af00f 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/libp2p/go-msgio v0.0.6 github.com/multiformats/go-multiaddr v0.3.3 github.com/multiformats/go-multistream v0.2.2 + github.com/sasha-s/go-deadlock v0.3.1 github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.16.0 ) diff --git a/go.sum b/go.sum index d186ebf0..c07a00de 100644 --- a/go.sum +++ b/go.sum @@ -716,6 +716,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -763,6 +765,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= +github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= diff --git a/internal/blockpresencemanager/blockpresencemanager.go b/internal/blockpresencemanager/blockpresencemanager.go index 1d3acb0e..d52942f2 100644 --- a/internal/blockpresencemanager/blockpresencemanager.go +++ b/internal/blockpresencemanager/blockpresencemanager.go @@ -1,16 +1,15 @@ package blockpresencemanager import ( - "sync" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/sasha-s/go-deadlock" ) // BlockPresenceManager keeps track of which peers have indicated that they // have or explicitly don't have a block type BlockPresenceManager struct { - sync.RWMutex + deadlock.RWMutex presence map[cid.Cid]map[peer.ID]bool } diff --git a/internal/decision/blockstoremanager.go b/internal/decision/blockstoremanager.go index 7d6864eb..38b32c86 100644 --- a/internal/decision/blockstoremanager.go +++ b/internal/decision/blockstoremanager.go @@ -3,13 +3,13 @@ package decision import ( "context" "fmt" - "sync" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" bstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" + "github.com/sasha-s/go-deadlock" ) // blockstoreManager maintains a pool of workers that make requests to the blockstore. @@ -83,7 +83,7 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) ( return res, nil } - var lk sync.Mutex + var lk deadlock.Mutex return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) { size, err := bsm.bs.GetSize(c) if err != nil { @@ -105,7 +105,7 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[ return res, nil } - var lk sync.Mutex + var lk deadlock.Mutex return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) { blk, err := bsm.bs.Get(c) if err != nil { @@ -123,7 +123,7 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[ func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error { var err error - wg := sync.WaitGroup{} + wg := deadlock.WaitGroup{} for _, k := range ks { c := k wg.Add(1) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 76519bd3..b3309d97 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -4,10 +4,10 @@ package decision import ( "context" "fmt" - "sync" "time" "github.com/google/uuid" + "github.com/sasha-s/go-deadlock" bsmsg "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" @@ -142,7 +142,7 @@ type Engine struct { tagQueued, tagUseful string - lock sync.RWMutex // protects the fields immediately below + lock deadlock.RWMutex // protects the fields immediately below // ledgerMap lists block-related Ledgers by their Partner key. ledgerMap map[peer.ID]*ledger @@ -155,7 +155,7 @@ type Engine struct { ticker *time.Ticker - taskWorkerLock sync.Mutex + taskWorkerLock deadlock.Mutex taskWorkerCount int // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in @@ -173,7 +173,7 @@ type Engine struct { activeGauge metrics.Gauge // used to ensure metrics are reported each fixed number of operation - metricsLock sync.Mutex + metricsLock deadlock.Mutex metricUpdateCounter int } diff --git a/internal/decision/ledger.go b/internal/decision/ledger.go index 58723d0f..ae4afe55 100644 --- a/internal/decision/ledger.go +++ b/internal/decision/ledger.go @@ -1,10 +1,9 @@ package decision import ( - "sync" - pb "github.com/ipfs/go-bitswap/message/pb" wl "github.com/ipfs/go-bitswap/wantlist" + "github.com/sasha-s/go-deadlock" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" @@ -25,7 +24,7 @@ type ledger struct { // wantList is a (bounded, small) set of keys that Partner desires. wantList *wl.Wantlist - lk sync.RWMutex + lk deadlock.RWMutex } func (l *ledger) Wants(k cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType) { diff --git a/internal/decision/scoreledger.go b/internal/decision/scoreledger.go index 188c998a..0af58bc0 100644 --- a/internal/decision/scoreledger.go +++ b/internal/decision/scoreledger.go @@ -1,11 +1,11 @@ package decision import ( - "sync" "time" "github.com/benbjohnson/clock" peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/sasha-s/go-deadlock" ) const ( @@ -55,7 +55,7 @@ type scoreledger struct { exchangeCount uint64 // the record lock - lock sync.RWMutex + lock deadlock.RWMutex clock clock.Clock } @@ -110,7 +110,7 @@ type DefaultScoreLedger struct { // is closed on Close closing chan struct{} // protects the fields immediatly below - lock sync.RWMutex + lock deadlock.RWMutex // ledgerMap lists score ledgers by their partner key. ledgerMap map[peer.ID]*scoreledger // how frequently the engine should sample peer usefulness diff --git a/internal/messagequeue/donthavetimeoutmgr.go b/internal/messagequeue/donthavetimeoutmgr.go index e1b42c42..45e2ab21 100644 --- a/internal/messagequeue/donthavetimeoutmgr.go +++ b/internal/messagequeue/donthavetimeoutmgr.go @@ -2,12 +2,12 @@ package messagequeue import ( "context" - "sync" "time" "github.com/benbjohnson/clock" cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/sasha-s/go-deadlock" ) const ( @@ -73,7 +73,7 @@ type dontHaveTimeoutMgr struct { maxExpectedWantProcessTime time.Duration // All variables below here must be protected by the lock - lk sync.RWMutex + lk deadlock.RWMutex // has the timeout manager started started bool // wants that are active (waiting for a response or timeout) diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 19bab762..90d9d021 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -3,7 +3,6 @@ package messagequeue import ( "context" "math" - "sync" "time" "github.com/benbjohnson/clock" @@ -15,6 +14,7 @@ import ( logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/sasha-s/go-deadlock" "go.uber.org/zap" ) @@ -83,7 +83,7 @@ type MessageQueue struct { responses chan []cid.Cid // Take lock whenever any of these variables are modified - wllock sync.Mutex + wllock deadlock.Mutex bcstWants recallWantlist peerWants recallWantlist cancels *cid.Set @@ -91,7 +91,7 @@ type MessageQueue struct { // Dont touch any of these variables outside of run loop sender bsnet.MessageSender - rebroadcastIntervalLk sync.RWMutex + rebroadcastIntervalLk deadlock.RWMutex rebroadcastInterval time.Duration rebroadcastTimer *clock.Timer // For performance reasons we just clear out the fields of the message diff --git a/internal/notifications/notifications.go b/internal/notifications/notifications.go index 7defea73..a05f2a7e 100644 --- a/internal/notifications/notifications.go +++ b/internal/notifications/notifications.go @@ -2,11 +2,11 @@ package notifications import ( "context" - "sync" pubsub "github.com/cskr/pubsub" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + "github.com/sasha-s/go-deadlock" ) const bufferSize = 16 @@ -29,7 +29,7 @@ func New() PubSub { } type impl struct { - lk sync.RWMutex + lk deadlock.RWMutex wrapped pubsub.PubSub closed chan struct{} diff --git a/internal/peermanager/peermanager.go b/internal/peermanager/peermanager.go index 1d4538a7..1299348d 100644 --- a/internal/peermanager/peermanager.go +++ b/internal/peermanager/peermanager.go @@ -2,10 +2,10 @@ package peermanager import ( "context" - "sync" logging "github.com/ipfs/go-log" "github.com/ipfs/go-metrics-interface" + "github.com/sasha-s/go-deadlock" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -34,7 +34,7 @@ type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue // PeerManager manages a pool of peers and sends messages to peers in the pool. type PeerManager struct { // sync access to peerQueues and peerWantManager - pqLk sync.RWMutex + pqLk deadlock.RWMutex // peerQueues -- interact through internal utility functions get/set/remove/iterate peerQueues map[peer.ID]PeerQueue pwm *peerWantManager @@ -42,7 +42,7 @@ type PeerManager struct { createPeerQueue PeerQueueFactory ctx context.Context - psLk sync.RWMutex + psLk deadlock.RWMutex sessions map[uint64]Session peerSessions map[peer.ID]map[uint64]struct{} diff --git a/internal/providerquerymanager/providerquerymanager.go b/internal/providerquerymanager/providerquerymanager.go index d47ffdb5..43df97d7 100644 --- a/internal/providerquerymanager/providerquerymanager.go +++ b/internal/providerquerymanager/providerquerymanager.go @@ -3,12 +3,12 @@ package providerquerymanager import ( "context" "fmt" - "sync" "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/sasha-s/go-deadlock" ) var log = logging.Logger("bitswap") @@ -77,7 +77,7 @@ type ProviderQueryManager struct { incomingFindProviderRequests chan *findProviderRequest findProviderTimeout time.Duration - timeoutMutex sync.RWMutex + timeoutMutex deadlock.RWMutex // do not touch outside the run loop inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus @@ -232,7 +232,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) pqm.timeoutMutex.RUnlock() providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) - wg := &sync.WaitGroup{} + wg := &deadlock.WaitGroup{} for p := range providers { wg.Add(1) go func(p peer.ID) { diff --git a/internal/sessioninterestmanager/sessioninterestmanager.go b/internal/sessioninterestmanager/sessioninterestmanager.go index 0ab32ed1..1142e111 100644 --- a/internal/sessioninterestmanager/sessioninterestmanager.go +++ b/internal/sessioninterestmanager/sessioninterestmanager.go @@ -1,16 +1,15 @@ package sessioninterestmanager import ( - "sync" - blocks "github.com/ipfs/go-block-format" + "github.com/sasha-s/go-deadlock" cid "github.com/ipfs/go-cid" ) // SessionInterestManager records the CIDs that each session is interested in. type SessionInterestManager struct { - lk sync.RWMutex + lk deadlock.RWMutex wants map[cid.Cid]map[uint64]bool } diff --git a/internal/sessionmanager/sessionmanager.go b/internal/sessionmanager/sessionmanager.go index 42b20938..ea2aad21 100644 --- a/internal/sessionmanager/sessionmanager.go +++ b/internal/sessionmanager/sessionmanager.go @@ -2,11 +2,11 @@ package sessionmanager import ( "context" - "sync" "time" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" + "github.com/sasha-s/go-deadlock" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" notifications "github.com/ipfs/go-bitswap/internal/notifications" @@ -53,11 +53,11 @@ type SessionManager struct { notif notifications.PubSub // Sessions - sessLk sync.RWMutex + sessLk deadlock.RWMutex sessions map[uint64]Session // Session Index - sessIDLk sync.Mutex + sessIDLk deadlock.Mutex sessID uint64 self peer.ID diff --git a/internal/sessionpeermanager/sessionpeermanager.go b/internal/sessionpeermanager/sessionpeermanager.go index db46691b..794a2f4e 100644 --- a/internal/sessionpeermanager/sessionpeermanager.go +++ b/internal/sessionpeermanager/sessionpeermanager.go @@ -2,9 +2,9 @@ package sessionpeermanager import ( "fmt" - "sync" logging "github.com/ipfs/go-log" + "github.com/sasha-s/go-deadlock" peer "github.com/libp2p/go-libp2p-core/peer" ) @@ -32,7 +32,7 @@ type SessionPeerManager struct { tag string id uint64 - plk sync.RWMutex + plk deadlock.RWMutex peers map[peer.ID]struct{} peersDiscovered bool } diff --git a/network/connecteventmanager.go b/network/connecteventmanager.go index b28e8e5b..f9d26fb7 100644 --- a/network/connecteventmanager.go +++ b/network/connecteventmanager.go @@ -1,9 +1,8 @@ package network import ( - "sync" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/sasha-s/go-deadlock" ) type ConnectionListener interface { @@ -13,7 +12,7 @@ type ConnectionListener interface { type connectEventManager struct { connListener ConnectionListener - lk sync.RWMutex + lk deadlock.RWMutex conns map[peer.ID]*connState } diff --git a/testnet/virtual.go b/testnet/virtual.go index 66f5e821..13345a1a 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -4,12 +4,12 @@ import ( "context" "errors" "sort" - "sync" "sync/atomic" "time" bsmsg "github.com/ipfs/go-bitswap/message" bsnet "github.com/ipfs/go-bitswap/network" + "github.com/sasha-s/go-deadlock" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" @@ -59,7 +59,7 @@ func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenera } type network struct { - mu sync.Mutex + mu deadlock.Mutex latencies map[peer.ID]map[peer.ID]time.Duration rateLimiters map[peer.ID]map[peer.ID]*mocknet.RateLimiter clients map[peer.ID]*receiverQueue @@ -83,7 +83,7 @@ type receiverQueue struct { receiver *networkClient queue []*message active bool - lk sync.Mutex + lk deadlock.Mutex } func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork {