Skip to content

Commit

Permalink
Merge pull request #710 from ipfs/no-goprocess
Browse files Browse the repository at this point in the history
Remove dependency on goprocess
  • Loading branch information
gammazero authored Nov 7, 2024
2 parents 98850cf + 19d1d6b commit 625aadd
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 302 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The following emojis are used to highlight certain changes:

### Changed

- No longer using `github.com/jbenet/goprocess` to avoid requiring in dependents.

### Removed

### Fixed
Expand Down
7 changes: 3 additions & 4 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ func (bs *Bitswap) Stat() (*Stat, error) {

func (bs *Bitswap) Close() error {
bs.net.Stop()
return multierr.Combine(
bs.Client.Close(),
bs.Server.Close(),
)
bs.Client.Close()
bs.Server.Close()
return nil
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
Expand Down
37 changes: 15 additions & 22 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -117,10 +115,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
// exclusively. We should probably find another way to share logging data
ctx, cancelFunc := context.WithCancel(parent)

px := process.WithTeardown(func() error {
return nil
})

// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
Expand Down Expand Up @@ -165,7 +159,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
bs = &Client{
blockstore: bstore,
network: network,
process: px,
cancel: cancelFunc,
closing: make(chan struct{}),
pm: pm,
sm: sm,
sim: sim,
Expand All @@ -185,16 +180,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore

pqm.Startup()

// bind the context and process.
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
sm.Shutdown()
cancelFunc()
notif.Shutdown()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first

return bs
}

Expand All @@ -212,7 +197,9 @@ type Client struct {
// manages channels of outgoing blocks for sessions
notif notifications.PubSub

process process.Process
cancel context.CancelFunc
closing chan struct{}
closeOnce sync.Once

// Counters for various statistics
counterLk sync.Mutex
Expand Down Expand Up @@ -287,7 +274,7 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
defer span.End()

select {
case <-bs.process.Closing():
case <-bs.closing:
return errors.New("bitswap is closed")
default:
}
Expand All @@ -310,10 +297,10 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
return nil
}

// receiveBlocksFrom process blocks received from the network
// receiveBlocksFrom processes blocks received from the network
func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
case <-bs.closing:
return errors.New("bitswap is closed")
default:
}
Expand Down Expand Up @@ -466,7 +453,13 @@ func (bs *Client) ReceiveError(err error) {

// Close is called to shutdown the Client
func (bs *Client) Close() error {
return bs.process.Close()
bs.closeOnce.Do(func() {
close(bs.closing)
bs.sm.Shutdown()
bs.cancel()
bs.notif.Shutdown()
})
return nil
}

// GetWantlist returns the current local wantlist (both want-blocks and
Expand Down
48 changes: 27 additions & 21 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
process "github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p/core/peer"
mh "github.com/multiformats/go-multihash"
)
Expand Down Expand Up @@ -195,6 +194,9 @@ type Engine struct {

taskWorkerLock sync.Mutex
taskWorkerCount int
waitWorkers sync.WaitGroup
cancel context.CancelFunc
closeOnce sync.Once

targetMessageSize int

Expand Down Expand Up @@ -376,12 +378,13 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
// more tasks if it has some maximum work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
ctx, cancel := context.WithCancel(context.Background())

e := &Engine{
scoreLedger: NewDefaultScoreLedger(),
bstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount,
Expand All @@ -401,6 +404,7 @@ func NewEngine(
tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()),
maxQueuedWantlistEntriesPerPeer: defaults.MaxQueuedWantlistEntiresPerPeer,
maxCidSize: defaults.MaximumAllowedCid,
cancel: cancel,
}

for _, opt := range opts {
Expand Down Expand Up @@ -437,6 +441,8 @@ func NewEngine(
log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize)
}

e.startWorkers(ctx)

return e
}

Expand All @@ -462,43 +468,42 @@ func (e *Engine) SetSendDontHaves(send bool) {
// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
func (e *Engine) startScoreLedger(px process.Process) {
func (e *Engine) startScoreLedger() {
e.scoreLedger.Start(func(p peer.ID, score int) {
if score == 0 {
e.peerTagger.UntagPeer(p, e.tagUseful)
} else {
e.peerTagger.TagPeer(p, e.tagUseful, score)
}
})
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.scoreLedger.Stop()
})
}

func (e *Engine) startBlockstoreManager(px process.Process) {
// startWorkers starts workers to handle requests from other nodes for the data
// on this node.
func (e *Engine) startWorkers(ctx context.Context) {
e.bsm.start()
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.bsm.stop()
})
}

// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
e.startBlockstoreManager(px)
e.startScoreLedger(px)
e.startScoreLedger()

e.taskWorkerLock.Lock()
defer e.taskWorkerLock.Unlock()

e.waitWorkers.Add(e.taskWorkerCount)
for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(_ process.Process) {
e.taskWorker(ctx)
})
go e.taskWorker(ctx)
}
}

// Close shuts down the decision engine and returns after all workers have
// finished. Safe to call multiple times/concurrently.
func (e *Engine) Close() {
e.closeOnce.Do(func() {
e.cancel()
e.bsm.stop()
e.scoreLedger.Stop()
})
e.waitWorkers.Wait()
}

func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
}
Expand All @@ -524,6 +529,7 @@ func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
func (e *Engine) taskWorker(ctx context.Context) {
defer e.waitWorkers.Done()
defer e.taskWorkerExit()
for {
oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
Expand Down
Loading

0 comments on commit 625aadd

Please sign in to comment.