From 1b8954cdd4b462e2787913cd82cc8a1b30c46393 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 30 Oct 2024 10:40:30 -1000 Subject: [PATCH 1/3] Remove dependency on goprocess The dependency on goprocess is not needed by boxo, and removing it removes the need to support it in code dependent on boxo. Closes #709 --- CHANGELOG.md | 2 + bitswap/bitswap.go | 7 +- bitswap/client/client.go | 38 ++-- bitswap/server/internal/decision/engine.go | 48 +++-- .../server/internal/decision/engine_test.go | 202 ++++++++---------- bitswap/server/server.go | 108 +++++----- bootstrap/bootstrap.go | 108 ++++++---- go.mod | 2 +- go.sum | 1 - namesys/republisher/repub.go | 23 +- namesys/republisher/repub_test.go | 9 +- 11 files changed, 272 insertions(+), 276 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 176a0fb7e..6c5c621d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ The following emojis are used to highlight certain changes: ### Changed +- No longer using `github.com/jbenet/goprocess` to avoid requiring in dependents. + ### Removed ### Fixed diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 393ab96ad..ddc50f6dd 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -144,10 +144,9 @@ func (bs *Bitswap) Stat() (*Stat, error) { func (bs *Bitswap) Close() error { bs.net.Stop() - return multierr.Combine( - bs.Client.Close(), - bs.Server.Close(), - ) + bs.Client.Close() + bs.Server.Close() + return nil } func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid { diff --git a/bitswap/client/client.go b/bitswap/client/client.go index fc735f448..b4bc91b58 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -31,8 +31,6 @@ import ( delay "github.com/ipfs/go-ipfs-delay" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" - process "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -117,10 +115,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore // exclusively. We should probably find another way to share logging data ctx, cancelFunc := context.WithCancel(parent) - px := process.WithTeardown(func() error { - return nil - }) - // onDontHaveTimeout is called when a want-block is sent to a peer that // has an old version of Bitswap that doesn't support DONT_HAVE messages, // or when no response is received within a timeout. @@ -165,7 +159,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore bs = &Client{ blockstore: bstore, network: network, - process: px, + cancel: cancelFunc, + closing: make(chan struct{}), pm: pm, sm: sm, sim: sim, @@ -185,16 +180,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore pqm.Startup() - // bind the context and process. - // do it over here to avoid closing before all setup is done. - go func() { - <-px.Closing() // process closes first - sm.Shutdown() - cancelFunc() - notif.Shutdown() - }() - procctx.CloseAfterContext(px, ctx) // parent cancelled first - return bs } @@ -212,7 +197,9 @@ type Client struct { // manages channels of outgoing blocks for sessions notif notifications.PubSub - process process.Process + cancel context.CancelFunc + closing chan struct{} + closeOnce sync.Once // Counters for various statistics counterLk sync.Mutex @@ -287,7 +274,7 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err defer span.End() select { - case <-bs.process.Closing(): + case <-bs.closing: return errors.New("bitswap is closed") default: } @@ -310,10 +297,10 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err return nil } -// receiveBlocksFrom process blocks received from the network +// receiveBlocksFrom processes blocks received from the network func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { select { - case <-bs.process.Closing(): + case <-bs.closing: return errors.New("bitswap is closed") default: } @@ -465,8 +452,13 @@ func (bs *Client) ReceiveError(err error) { } // Close is called to shutdown the Client -func (bs *Client) Close() error { - return bs.process.Close() +func (bs *Client) Close() { + bs.closeOnce.Do(func() { + close(bs.closing) + bs.sm.Shutdown() + bs.cancel() + bs.notif.Shutdown() + }) } // GetWantlist returns the current local wantlist (both want-blocks and diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 5e4463e33..a46d67dd7 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -25,7 +25,6 @@ import ( "github.com/ipfs/go-peertaskqueue" "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipfs/go-peertaskqueue/peertracker" - process "github.com/jbenet/goprocess" "github.com/libp2p/go-libp2p/core/peer" mh "github.com/multiformats/go-multihash" ) @@ -195,6 +194,9 @@ type Engine struct { taskWorkerLock sync.Mutex taskWorkerCount int + waitWorkers sync.WaitGroup + cancel context.CancelFunc + closeOnce sync.Once targetMessageSize int @@ -376,12 +378,13 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { // maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer // more tasks if it has some maximum work already outstanding. func NewEngine( - ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, opts ...Option, ) *Engine { + ctx, cancel := context.WithCancel(context.Background()) + e := &Engine{ scoreLedger: NewDefaultScoreLedger(), bstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount, @@ -401,6 +404,7 @@ func NewEngine( tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()), maxQueuedWantlistEntriesPerPeer: defaults.MaxQueuedWantlistEntiresPerPeer, maxCidSize: defaults.MaximumAllowedCid, + cancel: cancel, } for _, opt := range opts { @@ -437,6 +441,8 @@ func NewEngine( log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize) } + e.startWorkers(ctx) + return e } @@ -462,7 +468,7 @@ func (e *Engine) SetSendDontHaves(send bool) { // Starts the score ledger. Before start the function checks and, // if it is unset, initializes the scoreLedger with the default // implementation. -func (e *Engine) startScoreLedger(px process.Process) { +func (e *Engine) startScoreLedger() { e.scoreLedger.Start(func(p peer.ID, score int) { if score == 0 { e.peerTagger.UntagPeer(p, e.tagUseful) @@ -470,35 +476,34 @@ func (e *Engine) startScoreLedger(px process.Process) { e.peerTagger.TagPeer(p, e.tagUseful, score) } }) - px.Go(func(ppx process.Process) { - <-ppx.Closing() - e.scoreLedger.Stop() - }) } -func (e *Engine) startBlockstoreManager(px process.Process) { +// startWorkers starts workers to handle requests from other nodes for the data +// on this node. +func (e *Engine) startWorkers(ctx context.Context) { e.bsm.start() - px.Go(func(ppx process.Process) { - <-ppx.Closing() - e.bsm.stop() - }) -} - -// Start up workers to handle requests from other nodes for the data on this node -func (e *Engine) StartWorkers(ctx context.Context, px process.Process) { - e.startBlockstoreManager(px) - e.startScoreLedger(px) + e.startScoreLedger() e.taskWorkerLock.Lock() defer e.taskWorkerLock.Unlock() + e.waitWorkers.Add(e.taskWorkerCount) for i := 0; i < e.taskWorkerCount; i++ { - px.Go(func(_ process.Process) { - e.taskWorker(ctx) - }) + go e.taskWorker(ctx) } } +// Close shuts down the decision engine and returns after all workers have +// finished. Safe to call multiple times/concurrently. +func (e *Engine) Close() { + e.closeOnce.Do(func() { + e.cancel() + e.bsm.stop() + e.scoreLedger.Stop() + }) + e.waitWorkers.Wait() +} + func (e *Engine) onPeerAdded(p peer.ID) { e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight) } @@ -524,6 +529,7 @@ func (e *Engine) LedgerForPeer(p peer.ID) *Receipt { // and adds them to an envelope that is passed off to the bitswap workers, // which send the message to the network. func (e *Engine) taskWorker(ctx context.Context) { + defer e.waitWorkers.Done() defer e.taskWorkerExit() for { oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index 5cc1375c7..d549e1218 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -23,7 +23,6 @@ import ( ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-test/random" - process "github.com/jbenet/goprocess" peer "github.com/libp2p/go-libp2p/core/peer" libp2ptest "github.com/libp2p/go-libp2p/core/test" mh "github.com/multiformats/go-multihash" @@ -95,15 +94,14 @@ type engineSet struct { Blockstore blockstore.Blockstore } -func newTestEngine(ctx context.Context, idStr string, opts ...Option) engineSet { - return newTestEngineWithSampling(ctx, idStr, shortTerm, nil, clock.New(), opts...) +func newTestEngine(idStr string, opts ...Option) engineSet { + return newTestEngineWithSampling(idStr, shortTerm, nil, clock.New(), opts...) } -func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock, opts ...Option) engineSet { +func newTestEngineWithSampling(idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock, opts ...Option) engineSet { fpt := &fakePeerTagger{} bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, append(opts[:len(opts):len(opts)], WithScoreLedger(NewTestScoreLedger(peerSampleInterval, sampleCh, clock)), WithBlockstoreWorkerCount(4))...) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, fpt, "localhost", 0, append(opts[:len(opts):len(opts)], WithScoreLedger(NewTestScoreLedger(peerSampleInterval, sampleCh, clock)), WithBlockstoreWorkerCount(4))...) return engineSet{ Peer: peer.ID(idStr), PeerTagger: fpt, @@ -113,20 +111,19 @@ func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInte } func TestConsistentAccounting(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - sender := newTestEngine(ctx, "Ernie") - receiver := newTestEngine(ctx, "Bert") + sender := newTestEngine("Ernie") + defer sender.Engine.Close() + receiver := newTestEngine("Bert") + defer receiver.Engine.Close() // Send messages from Ernie to Bert for i := 0; i < 1000; i++ { - m := message.New(false) content := []string{"this", "is", "message", "i"} m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) sender.Engine.MessageSent(receiver.Peer, m) - receiver.Engine.MessageReceived(ctx, sender.Peer, m) + receiver.Engine.MessageReceived(context.Background(), sender.Peer, m) receiver.Engine.ReceivedBlocks(sender.Peer, m.Blocks()) } @@ -148,17 +145,17 @@ func TestConsistentAccounting(t *testing.T) { } func TestPeerIsAddedToPeersWhenMessageSent(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - sanfrancisco := newTestEngine(ctx, "sf") - seattle := newTestEngine(ctx, "sea") + sanfrancisco := newTestEngine("sf") + defer sanfrancisco.Engine.Close() + seattle := newTestEngine("sea") + defer seattle.Engine.Close() m := message.New(true) // We need to request something for it to add us as partner. m.AddEntry(blocks.NewBlock([]byte("Hæ")).Cid(), 0, pb.Message_Wantlist_Block, true) - seattle.Engine.MessageReceived(ctx, sanfrancisco.Peer, m) + seattle.Engine.MessageReceived(context.Background(), sanfrancisco.Peer, m) if seattle.Peer == sanfrancisco.Peer { t.Fatal("Sanity Check: Peers have same Key!") @@ -184,7 +181,6 @@ func peerIsPartner(p peer.ID, e *Engine) bool { } func newEngineForTesting( - ctx context.Context, bs blockstore.Blockstore, peerTagger PeerTagger, self peer.ID, @@ -192,14 +188,13 @@ func newEngineForTesting( opts ...Option, ) *Engine { opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize)) - return NewEngine(ctx, bs, peerTagger, self, opts...) + return NewEngine(bs, peerTagger, self, opts...) } func TestOutboxClosedWhenEngineClosed(t *testing.T) { t.SkipNow() // TODO implement *Engine.Close - ctx := context.Background() - e := newEngineForTesting(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) + defer e.Close() var wg sync.WaitGroup wg.Add(1) go func() { @@ -526,9 +521,8 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { testCases = onlyTestCases } - ctx := context.Background() - e := newEngineForTesting(ctx, bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) + defer e.Close() for i, testCase := range testCases { t.Logf("Test case %d:", i) for _, wl := range testCase.wls { @@ -683,9 +677,8 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) { testCases = onlyTestCases } - ctx := context.Background() - e := newEngineForTesting(ctx, bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) + defer e.Close() var next envChan for i, testCase := range testCases { @@ -866,11 +859,10 @@ func TestPartnerWantsThenCancels(t *testing.T) { } } - ctx := context.Background() for i := 0; i < numRounds; i++ { expected := make([][]string, 0, len(testcases)) - e := newEngineForTesting(ctx, bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) + defer e.Close() for _, testcase := range testcases { set := testcase[0] cancels := testcase[1] @@ -894,9 +886,8 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - ctx := context.Background() - e := newEngineForTesting(ctx, bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) + defer e.Close() blks := random.BlocksOfSize(4, 8*1024) msg := message.New(false) @@ -940,9 +931,8 @@ func TestSendDontHave(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - ctx := context.Background() - e := newEngineForTesting(ctx, bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) + defer e.Close() blks := random.BlocksOfSize(4, 8*1024) msg := message.New(false) @@ -1006,9 +996,8 @@ func TestWantlistForPeer(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - ctx := context.Background() - e := newEngineForTesting(ctx, bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, &fakePeerTagger{}, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4)) + defer e.Close() blks := random.BlocksOfSize(4, 8*1024) msg := message.New(false) @@ -1039,9 +1028,6 @@ func TestWantlistForPeer(t *testing.T) { } func TestTaskComparator(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - keys := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"} cids := make(map[cid.Cid]int) blks := make([]blocks.Block, 0, len(keys)) @@ -1054,19 +1040,22 @@ func TestTaskComparator(t *testing.T) { fpt := &fakePeerTagger{} sl := NewTestScoreLedger(shortTerm, nil, clock.New()) bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + if err := bs.PutMany(ctx, blks); err != nil { t.Fatal(err) } // use a single task worker so that the order of outgoing messages is deterministic - e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(sl), WithBlockstoreWorkerCount(4), WithTaskWorkerCount(1), + e := newEngineForTesting(bs, fpt, "localhost", 0, WithScoreLedger(sl), WithBlockstoreWorkerCount(4), WithTaskWorkerCount(1), // if this Option is omitted, the test fails WithTaskComparator(func(ta, tb *TaskInfo) bool { // prioritize based on lexicographic ordering of block content return cids[ta.Cid] < cids[tb.Cid] }), ) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + defer e.Close() // rely on randomness of Go map's iteration order to add Want entries in random order peerIDs := make([]peer.ID, len(keys)) @@ -1093,9 +1082,6 @@ func TestTaskComparator(t *testing.T) { } func TestPeerBlockFilter(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - // Generate a few keys keys := []string{"a", "b", "c", "d"} blks := make([]blocks.Block, 0, len(keys)) @@ -1114,11 +1100,14 @@ func TestPeerBlockFilter(t *testing.T) { fpt := &fakePeerTagger{} sl := NewTestScoreLedger(shortTerm, nil, clock.New()) bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + if err := bs.PutMany(ctx, blks); err != nil { t.Fatal(err) } - e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(sl), WithBlockstoreWorkerCount(4), + e := newEngineForTesting(bs, fpt, "localhost", 0, WithScoreLedger(sl), WithBlockstoreWorkerCount(4), WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { // peer 0 has access to everything if p == peerIDs[0] { @@ -1132,7 +1121,7 @@ func TestPeerBlockFilter(t *testing.T) { return blks[3].Cid().Equals(c) }), ) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + defer e.Close() // Setup the test type testCaseEntry struct { @@ -1252,9 +1241,6 @@ func TestPeerBlockFilter(t *testing.T) { } func TestPeerBlockFilterMutability(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - // Generate a few keys keys := []string{"a", "b", "c", "d"} blks := make([]blocks.Block, 0, len(keys)) @@ -1269,18 +1255,21 @@ func TestPeerBlockFilterMutability(t *testing.T) { fpt := &fakePeerTagger{} sl := NewTestScoreLedger(shortTerm, nil, clock.New()) bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + if err := bs.PutMany(ctx, blks); err != nil { t.Fatal(err) } filterAllowList := make(map[cid.Cid]bool) - e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(sl), WithBlockstoreWorkerCount(4), + e := newEngineForTesting(bs, fpt, "localhost", 0, WithScoreLedger(sl), WithBlockstoreWorkerCount(4), WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool { return filterAllowList[c] }), ) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + defer e.Close() // Setup the test type testCaseEntry struct { @@ -1421,10 +1410,10 @@ func TestPeerBlockFilterMutability(t *testing.T) { } func TestTaggingPeers(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - sanfrancisco := newTestEngine(ctx, "sf") - seattle := newTestEngine(ctx, "sea") + sanfrancisco := newTestEngine("sf") + defer sanfrancisco.Engine.Close() + seattle := newTestEngine("sea") + defer seattle.Engine.Close() keys := []string{"a", "b", "c", "d", "e"} for _, letter := range keys { @@ -1451,12 +1440,10 @@ func TestTaggingPeers(t *testing.T) { func TestTaggingUseful(t *testing.T) { const peerSampleIntervalHalf = 10 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - sampleCh := make(chan struct{}) mockClock := clock.NewMock() - me := newTestEngineWithSampling(ctx, "engine", peerSampleIntervalHalf*2, sampleCh, mockClock) + me := newTestEngineWithSampling("engine", peerSampleIntervalHalf*2, sampleCh, mockClock) + defer me.Engine.Close() mockClock.Add(1 * time.Millisecond) friend := peer.ID("friend") @@ -1544,9 +1531,6 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) { type envChan <-chan *Envelope func getNextEnvelope(e *Engine, next envChan, t time.Duration) (envChan, *Envelope) { - ctx, cancel := context.WithTimeout(context.Background(), t) - defer cancel() - if next == nil { next = <-e.Outbox() // returns immediately } @@ -1558,7 +1542,7 @@ func getNextEnvelope(e *Engine, next envChan, t time.Duration) (envChan, *Envelo return nil, nil } return nil, env - case <-ctx.Done(): + case <-time.After(t): // log.Warnf("got timeout") } return next, nil @@ -1606,12 +1590,11 @@ func stringsComplement(set, subset []string) []string { } func TestWantlistDoesNotGrowPastLimit(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - const limit = 32 - warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) - riga := newTestEngine(ctx, "riga") + warsaw := newTestEngine("warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) + defer warsaw.Engine.Close() + riga := newTestEngine("riga") + defer riga.Engine.Close() // Send in two messages to test reslicing. for i := 2; i != 0; i-- { @@ -1619,7 +1602,7 @@ func TestWantlistDoesNotGrowPastLimit(t *testing.T) { for j := limit * 3 / 4; j != 0; j-- { m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(i, j))).Cid(), 0, pb.Message_Wantlist_Block, true) } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) } if warsaw.Peer == riga.Peer { @@ -1633,19 +1616,19 @@ func TestWantlistDoesNotGrowPastLimit(t *testing.T) { } func TestWantlistGrowsToLimit(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - const limit = 32 - warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) - riga := newTestEngine(ctx, "riga") + warsaw := newTestEngine("warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit)) + defer warsaw.Engine.Close() + riga := newTestEngine("riga") + defer riga.Engine.Close() // Send in two messages to test reslicing. m := message.New(false) for j := limit; j != 0; j-- { m.AddEntry(blocks.NewBlock([]byte(strconv.Itoa(j))).Cid(), 0, pb.Message_Wantlist_Block, true) } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) if warsaw.Peer == riga.Peer { t.Fatal("Sanity Check: Peers have same Key!") @@ -1658,12 +1641,11 @@ func TestWantlistGrowsToLimit(t *testing.T) { } func TestIgnoresCidsAboveLimit(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - const cidLimit = 64 - warsaw := newTestEngine(ctx, "warsaw", WithMaxCidSize(cidLimit)) - riga := newTestEngine(ctx, "riga") + warsaw := newTestEngine("warsaw", WithMaxCidSize(cidLimit)) + defer warsaw.Engine.Close() + riga := newTestEngine("riga") + defer riga.Engine.Close() // Send in two messages to test reslicing. m := message.New(true) @@ -1678,7 +1660,7 @@ func TestIgnoresCidsAboveLimit(t *testing.T) { rand.Read(hash[startOfDigest:]) m.AddEntry(cid.NewCidV1(cid.Raw, hash), 0, pb.Message_Wantlist_Block, true) - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) if warsaw.Peer == riga.Peer { t.Fatal("Sanity Check: Peers have same Key!") @@ -1691,11 +1673,10 @@ func TestIgnoresCidsAboveLimit(t *testing.T) { } func TestKillConnectionForInlineCid(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - warsaw := newTestEngine(ctx, "warsaw") - riga := newTestEngine(ctx, "riga") + warsaw := newTestEngine("warsaw") + defer warsaw.Engine.Close() + riga := newTestEngine("riga") + defer riga.Engine.Close() if warsaw.Peer == riga.Peer { t.Fatal("Sanity Check: Peers have same Key!") @@ -1715,7 +1696,7 @@ func TestKillConnectionForInlineCid(t *testing.T) { rand.Read(hash[startOfDigest:]) m.AddEntry(cid.NewCidV1(cid.Raw, hash), 0, pb.Message_Wantlist_Block, true) - if !warsaw.Engine.MessageReceived(ctx, riga.Peer, m) { + if !warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) { t.Fatal("connection was not killed when receiving inline in cancel") } @@ -1724,15 +1705,12 @@ func TestKillConnectionForInlineCid(t *testing.T) { m.AddEntry(blocks.NewBlock([]byte("Hæ")).Cid(), 0, pb.Message_Wantlist_Block, true) m.Cancel(cid.NewCidV1(cid.Raw, hash)) - if !warsaw.Engine.MessageReceived(ctx, riga.Peer, m) { + if !warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) { t.Fatal("connection was not killed when receiving inline in cancel") } } func TestWantlistBlocked(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - const limit = 32 bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) @@ -1752,15 +1730,17 @@ func TestWantlistBlocked(t *testing.T) { } fpt := &fakePeerTagger{} - e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + defer e.Close() + warsaw := engineSet{ Peer: peer.ID("warsaw"), PeerTagger: fpt, Blockstore: bs, Engine: e, } - riga := newTestEngine(ctx, "riga") + riga := newTestEngine("riga") + defer riga.Engine.Close() if warsaw.Peer == riga.Peer { t.Fatal("Sanity Check: Peers have same Key!") } @@ -1773,7 +1753,7 @@ func TestWantlistBlocked(t *testing.T) { m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) dontHaveCids[i] = c } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) wl := warsaw.Engine.WantlistForPeer(riga.Peer) // Check that all the dontHave wants are on the wantlist. for _, c := range dontHaveCids { @@ -1787,7 +1767,7 @@ func TestWantlistBlocked(t *testing.T) { for _, c := range haveCids { m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) wl = warsaw.Engine.WantlistForPeer(riga.Peer) // Check that all the dontHave wants are on the wantlist. for _, c := range haveCids { @@ -1804,7 +1784,7 @@ func TestWantlistBlocked(t *testing.T) { m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) dontHaveCids[i] = c } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) // Check that all the new dontHave wants are not on the wantlist. for _, c := range dontHaveCids { if findCid(c, wl) { @@ -1815,9 +1795,6 @@ func TestWantlistBlocked(t *testing.T) { } func TestWantlistOverflow(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - const limit = 32 bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) @@ -1838,20 +1815,21 @@ func TestWantlistOverflow(t *testing.T) { } fpt := &fakePeerTagger{} - e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) - e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + e := newEngineForTesting(bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + defer e.Close() warsaw := engineSet{ Peer: peer.ID("warsaw"), PeerTagger: fpt, Blockstore: bs, Engine: e, } - riga := newTestEngine(ctx, "riga") + riga := newTestEngine("riga") + defer riga.Engine.Close() if warsaw.Peer == riga.Peer { t.Fatal("Sanity Check: Peers have same Key!") } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) // Check that the wantlist is at the size limit. wl := warsaw.Engine.WantlistForPeer(riga.Peer) if len(wl) != limit { @@ -1867,7 +1845,7 @@ func TestWantlistOverflow(t *testing.T) { m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) lowPrioCids[i] = c } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) wl = warsaw.Engine.WantlistForPeer(riga.Peer) if len(wl) != limit { t.Fatal("wantlist size", len(wl), "does not match limit", limit) @@ -1893,7 +1871,7 @@ func TestWantlistOverflow(t *testing.T) { m.AddEntry(c, 10, pb.Message_Wantlist_Block, true) highPrioCids[i] = c } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) wl = warsaw.Engine.WantlistForPeer(riga.Peer) if len(wl) != limit { t.Fatal("wantlist size", len(wl), "does not match limit", limit) @@ -1918,7 +1896,7 @@ func TestWantlistOverflow(t *testing.T) { m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) blockCids[i] = c } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) wl = warsaw.Engine.WantlistForPeer(riga.Peer) if len(wl) != limit { t.Fatal("wantlist size", len(wl), "does not match limit", limit) @@ -1942,7 +1920,7 @@ func TestWantlistOverflow(t *testing.T) { for _, c := range origCids { m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) } - warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + warsaw.Engine.MessageReceived(context.Background(), riga.Peer, m) wl = warsaw.Engine.WantlistForPeer(riga.Peer) for _, c := range origCids { if !findCid(c, wl) { diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 46d29a8fc..2b45b324b 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -20,8 +20,6 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" - process "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" ) @@ -57,7 +55,12 @@ type Server struct { // the total number of simultaneous threads sending outgoing messages taskWorkerCount int - process process.Process + // Cancel stops the server + cancel context.CancelFunc + closing chan struct{} + closeOnce sync.Once + // waitWorkers waits for all worker goroutines to exit. + waitWorkers sync.WaitGroup // newBlocks is a channel for newly added blocks to be provided to the // network. blocks pushed down this channel get buffered and fed to the @@ -78,20 +81,13 @@ type Server struct { func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { ctx, cancel := context.WithCancel(ctx) - px := process.WithTeardown(func() error { - return nil - }) - go func() { - <-px.Closing() // process closes first - cancel() - }() - s := &Server{ sentHistogram: bmetrics.SentHist(ctx), sendTimeHistogram: bmetrics.SendTimeHist(ctx), taskWorkerCount: defaults.BitswapTaskWorkerCount, network: network, - process: px, + cancel: cancel, + closing: make(chan struct{}), provideEnabled: true, hasBlockBufferSize: defaults.HasBlockBufferSize, provideKeys: make(chan cid.Cid, provideKeysBufferSize), @@ -103,7 +99,6 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl } s.engine = decision.NewEngine( - ctx, bstore, network.ConnectionManager(), network.Self(), @@ -111,7 +106,7 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl ) s.engineOptions = nil - s.startWorkers(ctx, px) + s.startWorkers(ctx) return s } @@ -293,33 +288,31 @@ func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid { return out } -func (bs *Server) startWorkers(ctx context.Context, px process.Process) { - bs.engine.StartWorkers(ctx, px) - +func (bs *Server) startWorkers(ctx context.Context) { // Start up workers to handle requests from other nodes for the data on this node + bs.waitWorkers.Add(bs.taskWorkerCount) for i := 0; i < bs.taskWorkerCount; i++ { i := i - px.Go(func(px process.Process) { - bs.taskWorker(ctx, i) - }) + go bs.taskWorker(ctx, i) } if bs.provideEnabled { - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) + bs.waitWorkers.Add(1) + go bs.provideCollector(ctx) // Spawn up multiple workers to handle incoming blocks // consider increasing number if providing blocks bottlenecks // file transfers - px.Go(bs.provideWorker) + bs.waitWorkers.Add(1) + go bs.provideWorker(ctx) } } func (bs *Server) taskWorker(ctx context.Context, id int) { - defer log.Debug("bitswap task worker shutting down...") + defer bs.waitWorkers.Done() + log := log.With("ID", id) + defer log.Debug("bitswap task worker shutting down...") for { log.Debug("Bitswap.TaskWorker.Loop") select { @@ -341,8 +334,7 @@ func (bs *Server) taskWorker(ctx context.Context, id int) { } bs.sendBlocks(ctx, envelope) - dur := time.Since(start) - bs.sendTimeHistogram.Observe(dur.Seconds()) + bs.sendTimeHistogram.Observe(time.Since(start).Seconds()) case <-ctx.Done(): return @@ -452,7 +444,7 @@ func (bs *Server) Stat() (Stat, error) { // that those blocks are available in the blockstore before calling this function. func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error { select { - case <-bs.process.Closing(): + case <-bs.closing: return errors.New("bitswap is closed") default: } @@ -466,8 +458,8 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err select { case bs.newBlocks <- blk.Cid(): // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() + case <-bs.closing: + return nil } } } @@ -476,6 +468,7 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err } func (bs *Server) provideCollector(ctx context.Context) { + defer bs.waitWorkers.Done() defer close(bs.provideKeys) var toProvide []cid.Cid var nextKey cid.Cid @@ -508,18 +501,16 @@ func (bs *Server) provideCollector(ctx context.Context) { } } -func (bs *Server) provideWorker(px process.Process) { - // FIXME: OnClosingContext returns a _custom_ context type. - // Unfortunately, deriving a new cancelable context from this custom - // type fires off a goroutine. To work around this, we create a single - // cancelable context up-front and derive all sub-contexts from that. - // - // See: https://github.com/ipfs/go-ipfs/issues/5810 - ctx := procctx.OnClosingContext(px) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - +func (bs *Server) provideWorker(ctx context.Context) { limit := make(chan struct{}, provideWorkerMax) + defer func() { + // Wait until all limitGoProvide goroutines are done before declaring + // this worker as done. + for i := 0; i < provideWorkerMax; i++ { + limit <- struct{}{} + } + bs.waitWorkers.Done() + }() limitedGoProvide := func(k cid.Cid, wid int) { defer func() { @@ -540,25 +531,18 @@ func (bs *Server) provideWorker(px process.Process) { // worker spawner, reads from bs.provideKeys until it closes, spawning a // _ratelimited_ number of workers to handle each key. - for wid := 2; ; wid++ { + wid := 2 + for k := range bs.provideKeys { log.Debug("Bitswap.ProvideWorker.Loop") - select { - case <-px.Closing(): + case limit <- struct{}{}: + go limitedGoProvide(k, wid) + case <-ctx.Done(): return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - select { - case <-px.Closing(): - return - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - } } + wid++ } + log.Debug("provideKeys channel closed") } func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { @@ -597,7 +581,13 @@ func (bs *Server) PeerDisconnected(p peer.ID) { bs.engine.PeerDisconnected(p) } -// Close is called to shutdown the Client -func (bs *Server) Close() error { - return bs.process.Close() +// Close is called to shutdown the Server. Returns when all workers and +// decision engine have finished. Safe to calling multiple times/concurrently. +func (bs *Server) Close() { + bs.closeOnce.Do(func() { + close(bs.closing) + bs.cancel() + }) + bs.engine.Close() + bs.waitWorkers.Wait() } diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 28b004559..eea833be3 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -10,9 +10,6 @@ import ( "time" logging "github.com/ipfs/go-log/v2" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" - periodicproc "github.com/jbenet/goprocess/periodic" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -112,78 +109,105 @@ func (cfg *BootstrapConfig) SetBackupPeers(load func(context.Context) []peer.Add // connections to well-known bootstrap peers. It also kicks off subsystem // bootstrapping (i.e. routing). func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConfig) (io.Closer, error) { - // make a signal to wait for one bootstrap round to complete. - doneWithRound := make(chan struct{}) - if len(cfg.BootstrapPeers()) == 0 { // We *need* to bootstrap but we have no bootstrap peers // configured *at all*, inform the user. log.Warn("no bootstrap nodes configured: go-ipfs may have difficulty connecting to the network") } - // the periodic bootstrap function -- the connection supervisor - periodic := func(worker goprocess.Process) { - ctx := goprocessctx.OnClosingContext(worker) + ctx, cancel := context.WithCancel(context.Background()) - if err := bootstrapRound(ctx, host, cfg); err != nil { - log.Debugf("%s bootstrap error: %s", id, err) + // Signal when first bootstrap round is complete, started independent of ticker. + doneWithRound := make(chan struct{}) + + go func() { + // the periodic bootstrap function -- the connection supervisor + periodic := func() { + if err := bootstrapRound(ctx, host, cfg); err != nil { + log.Debugf("%s bootstrap error: %s", id, err) + } } - // Exit the first call (triggered independently by `proc.Go`, not `Tick`) - // only after being done with the *single* Routing.Bootstrap call. Following - // periodic calls (`Tick`) will not block on this. + ticker := time.NewTicker(cfg.Period) + defer ticker.Stop() + + // Run first round independent of ticker. + periodic() <-doneWithRound - } + if ctx.Err() != nil { + return + } - // kick off the node's periodic bootstrapping - proc := periodicproc.Tick(cfg.Period, periodic) - proc.Go(periodic) // run one right now. + for { + select { + case <-ticker.C: + periodic() + case <-ctx.Done(): + return + } + } + }() // kick off Routing.Bootstrap if rt != nil { - ctx := goprocessctx.OnClosingContext(proc) if err := rt.Bootstrap(ctx); err != nil { - proc.Close() + cancel() + close(doneWithRound) return nil, err } } - doneWithRound <- struct{}{} - close(doneWithRound) // it no longer blocks periodic - // If loadBackupBootstrapPeers is not nil then saveBackupBootstrapPeers // must also not be nil. if cfg.loadBackupBootstrapPeers != nil { - startSavePeersAsTemporaryBootstrapProc(cfg, host, proc) + doneWithRound <- struct{}{} // wait for first bootstrap + startSavePeersAsTemporaryBootstrapProc(ctx, cfg, host) } - return proc, nil + return &bootstrapCloser{ + cancel: cancel, + }, nil +} + +type bootstrapCloser struct { + cancel context.CancelFunc +} + +func (bsc *bootstrapCloser) Close() error { + bsc.cancel() + return nil } // Aside of the main bootstrap process we also run a secondary one that saves // connected peers as a backup measure if we can't connect to the official // bootstrap ones. These peers will serve as *temporary* bootstrap nodes. -func startSavePeersAsTemporaryBootstrapProc(cfg BootstrapConfig, host host.Host, bootstrapProc goprocess.Process) { - savePeersFn := func(worker goprocess.Process) { - ctx := goprocessctx.OnClosingContext(worker) +func startSavePeersAsTemporaryBootstrapProc(ctx context.Context, cfg BootstrapConfig, host host.Host) { + go func() { + periodic := func() { + if err := saveConnectedPeersAsTemporaryBootstrap(ctx, host, cfg); err != nil { + log.Debugf("saveConnectedPeersAsTemporaryBootstrap error: %s", err) + } + } + + ticker := time.NewTicker(cfg.BackupBootstrapInterval) + defer ticker.Stop() - if err := saveConnectedPeersAsTemporaryBootstrap(ctx, host, cfg); err != nil { - log.Debugf("saveConnectedPeersAsTemporaryBootstrap error: %s", err) + // Run the first round now (after the first bootstrap process has + // finished) as the SavePeersPeriod can be much longer than bootstrap. + periodic() + if ctx.Err() != nil { + return } - } - savePeersProc := periodicproc.Tick(cfg.BackupBootstrapInterval, savePeersFn) - // When the main bootstrap process ends also terminate the 'save connected - // peers' ones. Coupling the two seems the easiest way to handle this backup - // process without additional complexity. - go func() { - <-bootstrapProc.Closing() - savePeersProc.Close() + for { + select { + case <-ticker.C: + periodic() + case <-ctx.Done(): + return + } + } }() - - // Run the first round now (after the first bootstrap process has finished) - // as the SavePeersPeriod can be much longer than bootstrap. - savePeersProc.Go(savePeersFn) } func saveConnectedPeersAsTemporaryBootstrap(ctx context.Context, host host.Host, cfg BootstrapConfig) error { diff --git a/go.mod b/go.mod index 54cb060e8..3054db658 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( github.com/ipld/go-car/v2 v2.14.2 github.com/ipld/go-codec-dagpb v1.6.0 github.com/ipld/go-ipld-prime v0.21.0 - github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-doh-resolver v0.4.0 github.com/libp2p/go-libp2p v0.37.0 @@ -118,6 +117,7 @@ require ( github.com/ipfs/go-verifcid v0.0.3 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jbenet/goprocess v0.1.4 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect diff --git a/go.sum b/go.sum index 303230175..57bb266c6 100644 --- a/go.sum +++ b/go.sum @@ -233,7 +233,6 @@ github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236 github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:wZ8hH8UxeryOs4kJEJaiui/s00hDSbE37OKsL47g+Sw= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= -github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= diff --git a/namesys/republisher/repub.go b/namesys/republisher/repub.go index 7ca2ae932..95e440436 100644 --- a/namesys/republisher/repub.go +++ b/namesys/republisher/repub.go @@ -16,8 +16,6 @@ import ( "github.com/ipfs/boxo/ipns" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" - "github.com/jbenet/goprocess" - gpctx "github.com/jbenet/goprocess/context" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" ) @@ -67,8 +65,17 @@ func NewRepublisher(ns namesys.Publisher, ds ds.Datastore, self ic.PrivKey, ks k } } -// Run starts the republisher facility. It can be stopped by stopping the provided proc. -func (rp *Republisher) Run(proc goprocess.Process) { +// Run starts the republisher facility. It can be stopped by calling the returned function.. +func (rp *Republisher) Run() func() { + ctx, cancel := context.WithCancel(context.Background()) + go rp.run(ctx) + return func() { + log.Debug("stopping republisher") + cancel() + } +} + +func (rp *Republisher) run(ctx context.Context) { timer := time.NewTimer(InitialRebroadcastDelay) defer timer.Stop() if rp.Interval < InitialRebroadcastDelay { @@ -79,21 +86,21 @@ func (rp *Republisher) Run(proc goprocess.Process) { select { case <-timer.C: timer.Reset(rp.Interval) - err := rp.republishEntries(proc) + err := rp.republishEntries(ctx) if err != nil { log.Info("republisher failed to republish: ", err) if FailureRetryInterval < rp.Interval { timer.Reset(FailureRetryInterval) } } - case <-proc.Closing(): + case <-ctx.Done(): return } } } -func (rp *Republisher) republishEntries(p goprocess.Process) error { - ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p)) +func (rp *Republisher) republishEntries(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) defer cancel() ctx, span := startSpan(ctx, "Republisher.RepublishEntries") defer span.End() diff --git a/namesys/republisher/repub_test.go b/namesys/republisher/repub_test.go index 88ec04dae..db42b02a9 100644 --- a/namesys/republisher/repub_test.go +++ b/namesys/republisher/repub_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/jbenet/goprocess" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" ic "github.com/libp2p/go-libp2p/core/crypto" @@ -125,8 +124,8 @@ func TestRepublish(t *testing.T) { repub.Interval = time.Second repub.RecordLifetime = time.Second * 5 - proc := goprocess.Go(repub.Run) - defer proc.Close() + stop := repub.Run() + defer stop() // now wait a couple seconds for it to fire time.Sleep(time.Second * 2) @@ -182,8 +181,8 @@ func TestLongEOLRepublish(t *testing.T) { repub.Interval = time.Millisecond * 500 repub.RecordLifetime = time.Second - proc := goprocess.Go(repub.Run) - defer proc.Close() + stop := repub.Run() + defer stop() // now wait a couple seconds for it to fire a few times time.Sleep(time.Second * 2) From 4a077bcb32c2a7be901576cf226ab670d5a3863e Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 30 Oct 2024 11:20:43 -1000 Subject: [PATCH 2/3] client Close needs to return error --- bitswap/client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index b4bc91b58..bab03c3cd 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -452,13 +452,14 @@ func (bs *Client) ReceiveError(err error) { } // Close is called to shutdown the Client -func (bs *Client) Close() { +func (bs *Client) Close() error { bs.closeOnce.Do(func() { close(bs.closing) bs.sm.Shutdown() bs.cancel() bs.notif.Shutdown() }) + return nil } // GetWantlist returns the current local wantlist (both want-blocks and From 19d1d6b9df169d38489da37aaefb78e269607ff4 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:51:11 -1000 Subject: [PATCH 3/3] Simplify provide woker goroutines --- bitswap/server/server.go | 72 +++++++++++++++------------------------- 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 2b45b324b..2cc3e2474 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -299,12 +299,7 @@ func (bs *Server) startWorkers(ctx context.Context) { if bs.provideEnabled { bs.waitWorkers.Add(1) go bs.provideCollector(ctx) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - bs.waitWorkers.Add(1) - go bs.provideWorker(ctx) + bs.startProvideWorkers(ctx) } } @@ -501,48 +496,33 @@ func (bs *Server) provideCollector(ctx context.Context) { } } -func (bs *Server) provideWorker(ctx context.Context) { - limit := make(chan struct{}, provideWorkerMax) - defer func() { - // Wait until all limitGoProvide goroutines are done before declaring - // this worker as done. - for i := 0; i < provideWorkerMax; i++ { - limit <- struct{}{} - } - bs.waitWorkers.Done() - }() - - limitedGoProvide := func(k cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - - log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k) - defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k) - - ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warn(err) - } - } +// startProvideWorkers starts provide worker goroutines that provide CID +// supplied by provideCollector. +// +// If providing blocks bottlenecks file transfers then consider increasing +// provideWorkerMax, +func (bs *Server) startProvideWorkers(ctx context.Context) { + bs.waitWorkers.Add(provideWorkerMax) + for id := 0; id < provideWorkerMax; id++ { + go func(wid int) { + defer bs.waitWorkers.Done() + + var runCount int + // Read bs.proviudeKeys until closed, when provideCollector exits. + for k := range bs.provideKeys { + runCount++ + log.Debugw("Bitswap provider worker start", "ID", wid, "run", runCount, "cid", k) + + ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) + if err := bs.network.Provide(ctx, k); err != nil { + log.Warn(err) + } + cancel() - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - wid := 2 - for k := range bs.provideKeys { - log.Debug("Bitswap.ProvideWorker.Loop") - select { - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - case <-ctx.Done(): - return - } - wid++ + log.Debugw("Bitswap provider worker done", "ID", wid, "run", runCount, "cid", k) + } + }(id) } - log.Debug("provideKeys channel closed") } func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {