diff --git a/CHANGELOG.md b/CHANGELOG.md index 7081f01cd..7190373ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,26 @@ The following emojis are used to highlight certain changes: ### Security +## [v0.24.0] + +### Added + +* `boxo/bitswap/server`: + * A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672) +* `routing/http`: + * added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) [#678](https://github.com/ipfs/boxo/pull/678) + * added support for address and protocol filtering to the delegated routing client ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#678](https://github.com/ipfs/boxo/pull/678). To add filtering to the client, use the [`WithFilterAddrs`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterAddrs) and [`WithFilterProtocols`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterProtocols) options when creating the client.Client-side filtering for servers that don't support filtering is enabled by default. To disable it, use the [`disableLocalFiltering`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#disableLocalFiltering) option when creating the client. + +### Changed + +### Removed + +### Fixed + +- `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393) + +### Security + ## [v0.23.0] ### Added diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index f5511cc7a..b30bcc87f 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -37,4 +37,7 @@ const ( // RebroadcastDelay is the default delay to trigger broadcast of // random CIDs in the wantlist. RebroadcastDelay = time.Minute + + // DefaultWantHaveReplaceSize controls the implicit behavior of WithWantHaveReplaceSize. + DefaultWantHaveReplaceSize = 1024 ) diff --git a/bitswap/options.go b/bitswap/options.go index 11e89fdf9..6a98b27db 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -71,6 +71,13 @@ func WithTaskComparator(comparator server.TaskComparator) Option { return Option{server.WithTaskComparator(comparator)} } +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which the bitswap server will replace a WantHave with a WantBlock response. +// See [server.WithWantHaveReplaceSize] for details. +func WithWantHaveReplaceSize(size int) Option { + return Option{server.WithWantHaveReplaceSize(size)} +} + func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { return Option{client.ProviderSearchDelay(newProvSearchDelay)} } diff --git a/bitswap/server/internal/decision/blockstoremanager.go b/bitswap/server/internal/decision/blockstoremanager.go index aa16b3126..d4c0f4254 100644 --- a/bitswap/server/internal/decision/blockstoremanager.go +++ b/bitswap/server/internal/decision/blockstoremanager.go @@ -121,6 +121,42 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) ( return res, nil } +func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) { + if len(ks) == 0 { + return nil, nil + } + hasBlocks := make([]bool, len(ks)) + + var count atomic.Int32 + err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) { + has, err := bsm.bs.Has(ctx, c) + if err != nil { + // Note: this isn't a fatal error. We shouldn't abort the request + log.Errorf("blockstore.Has(%c) error: %s", c, err) + return + } + if has { + hasBlocks[i] = true + count.Add(1) + } + }) + if err != nil { + return nil, err + } + results := count.Load() + if results == 0 { + return nil, nil + } + + res := make(map[cid.Cid]struct{}, results) + for i, ok := range hasBlocks { + if ok { + res[ks[i]] = struct{}{} + } + } + return res, nil +} + func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) { if len(ks) == 0 { return nil, nil diff --git a/bitswap/server/internal/decision/blockstoremanager_test.go b/bitswap/server/internal/decision/blockstoremanager_test.go index f65c88e83..2f2b7b23f 100644 --- a/bitswap/server/internal/decision/blockstoremanager_test.go +++ b/bitswap/server/internal/decision/blockstoremanager_test.go @@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) { cids = append(cids, b.Cid()) } - sizes, err := bsm.getBlockSizes(ctx, cids) + hasBlocks, err := bsm.hasBlocks(ctx, cids) if err != nil { t.Fatal(err) } - if len(sizes) != len(blks)-1 { + if len(hasBlocks) != len(blks)-1 { t.Fatal("Wrong response length") } - for _, c := range cids { - expSize := len(exp[c].RawData()) - size, ok := sizes[c] - - // Only the last key should be missing + _, ok := hasBlocks[c] if c.Equals(cids[len(cids)-1]) { if ok { t.Fatal("Non-existent block should not be in sizes map") } } else { if !ok { - t.Fatal("Block should be in sizes map") - } - if size != expSize { - t.Fatal("Block has wrong size") + t.Fatal("Block should be in hasBlocks") } } } diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 1174c94c0..5e4463e33 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -77,10 +77,6 @@ const ( // queuedTagWeight is the default weight for peers that have work queued // on their behalf. queuedTagWeight = 10 - - // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in - // bytes up to which we will replace a want-have with a want-block - maxBlockSizeReplaceHasWithBlock = 1024 ) // Envelope contains a message for a Peer. @@ -202,9 +198,9 @@ type Engine struct { targetMessageSize int - // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in - // bytes up to which we will replace a want-have with a want-block - maxBlockSizeReplaceHasWithBlock int + // wantHaveReplaceSize is the maximum size of the block in bytes up to + // which to replace a WantHave with a WantBlock. + wantHaveReplaceSize int sendDontHaves bool @@ -343,6 +339,14 @@ func WithSetSendDontHave(send bool) Option { } } +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which to replace a WantHave with a WantBlock response. +func WithWantHaveReplaceSize(size int) Option { + return func(e *Engine) { + e.wantHaveReplaceSize = size + } +} + // wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { return func(a, b *peertask.QueueTask) bool { @@ -369,32 +373,14 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { } // NewEngine creates a new block sending engine for the given block store. -// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum -// work already outstanding. +// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer +// more tasks if it has some maximum work already outstanding. func NewEngine( ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, opts ...Option, -) *Engine { - return newEngine( - ctx, - bs, - peerTagger, - self, - maxBlockSizeReplaceHasWithBlock, - opts..., - ) -} - -func newEngine( - ctx context.Context, - bs bstore.Blockstore, - peerTagger PeerTagger, - self peer.ID, - maxReplaceSize int, - opts ...Option, ) *Engine { e := &Engine{ scoreLedger: NewDefaultScoreLedger(), @@ -404,7 +390,7 @@ func newEngine( outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(time.Millisecond * 100), - maxBlockSizeReplaceHasWithBlock: maxReplaceSize, + wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, @@ -445,6 +431,12 @@ func newEngine( e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...) + if e.wantHaveReplaceSize == 0 { + log.Info("Replace WantHave with WantBlock is disabled") + } else { + log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize) + } + return e } @@ -689,16 +681,38 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } + noReplace := e.wantHaveReplaceSize == 0 + // Get block sizes for unique CIDs. - wantKs := cid.NewSet() + wantKs := make([]cid.Cid, 0, len(wants)) + var haveKs []cid.Cid for _, entry := range wants { - wantKs.Add(entry.Cid) + if noReplace && entry.WantType == pb.Message_Wantlist_Have { + haveKs = append(haveKs, entry.Cid) + } else { + wantKs = append(wantKs, entry.Cid) + } } - blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys()) + blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs) if err != nil { log.Info("aborting message processing", err) return false } + if len(haveKs) != 0 { + hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs) + if err != nil { + log.Info("aborting message processing", err) + return false + } + if len(hasBlocks) != 0 { + if blockSizes == nil { + blockSizes = make(map[cid.Cid]int, len(hasBlocks)) + } + for blkCid := range hasBlocks { + blockSizes[blkCid] = 0 + } + } + } e.lock.Lock() @@ -707,20 +721,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } var overflow []bsmsg.Entry - if len(wants) != 0 { - filteredWants := wants[:0] // shift inplace - for _, entry := range wants { - if !e.peerLedger.Wants(p, entry.Entry) { - // Cannot add entry because it would exceed size limit. - overflow = append(overflow, entry) - continue - } - filteredWants = append(filteredWants, entry) - } - // Clear truncated entries - early GC. - clear(wants[len(filteredWants):]) - wants = filteredWants - } + wants, overflow = e.filterOverflow(p, wants, overflow) if len(overflow) != 0 { log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow)) @@ -764,7 +765,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap sendDontHave(entry) } - // For each want-have / want-block + // For each want-block for _, entry := range wants { c := entry.Cid blockSize, found := blockSizes[c] @@ -776,7 +777,10 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap continue } // The block was found, add it to the queue - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) + + // Check if this is a want-block or a have-block that can be converted + // to a want-block. + isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize) log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock) @@ -810,6 +814,25 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } +func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { + if len(wants) == 0 { + return wants, overflow + } + + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if !e.peerLedger.Wants(p, entry.Entry) { + // Cannot add entry because it would exceed size limit. + overflow = append(overflow, entry) + continue + } + filteredWants = append(filteredWants, entry) + } + // Clear truncated entries - early GC. + clear(wants[len(filteredWants):]) + return filteredWants, overflow +} + // handleOverflow processes incoming wants that could not be addded to the peer // ledger without exceeding the peer want limit. These are handled by trying to // make room by canceling existing wants for which there is no block. If this @@ -913,17 +936,17 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] continue } + if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { + denials = append(denials, et) + continue + } + if et.WantType == pb.Message_Wantlist_Have { log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c) } else { log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c) } - if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { - denials = append(denials, et) - continue - } - // Do not take more wants that can be handled. if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) { wants = append(wants, et) @@ -1057,8 +1080,7 @@ func (e *Engine) PeerDisconnected(p peer.ID) { // If the want is a want-have, and it's below a certain size, send the full // block (instead of sending a HAVE) func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool { - isWantBlock := wantType == pb.Message_Wantlist_Block - return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock + return wantType == pb.Message_Wantlist_Block || blockSize <= e.wantHaveReplaceSize } func (e *Engine) numBytesSentTo(p peer.ID) uint64 { diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index 593bbde0f..5cc1375c7 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -188,17 +188,11 @@ func newEngineForTesting( bs blockstore.Blockstore, peerTagger PeerTagger, self peer.ID, - maxReplaceSize int, + wantHaveReplaceSize int, opts ...Option, ) *Engine { - return newEngine( - ctx, - bs, - peerTagger, - self, - maxReplaceSize, - opts..., - ) + opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize)) + return NewEngine(ctx, bs, peerTagger, self, opts...) } func TestOutboxClosedWhenEngineClosed(t *testing.T) { diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 85651a5ef..46d29a8fc 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -251,6 +251,38 @@ func HasBlockBufferSize(count int) Option { } } +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which the bitswap server will replace a WantHave with a WantBlock response. +// +// Behavior: +// - If size > 0: The server may send full blocks instead of just confirming possession +// for blocks up to the specified size. +// - If size = 0: WantHave replacement is disabled entirely. This allows the server to +// skip reading block sizes during WantHave request processing, which can be more +// efficient if the data storage bills "possession" checks and "reads" differently. +// +// Performance considerations: +// - Enabling replacement (size > 0) may reduce network round-trips but requires +// checking block sizes for each WantHave request to decide if replacement should occur. +// - Disabling replacement (size = 0) optimizes server performance by avoiding +// block size checks, potentially reducing infrastructure costs if possession checks +// are less expensive than full reads. +// +// It defaults to [defaults.DefaultWantHaveReplaceSize] +// and the value may change in future releases. +// +// Use this option to set explicit behavior to balance between network +// efficiency, server performance, and potential storage cost optimizations +// based on your specific use case and storage backend. +func WithWantHaveReplaceSize(size int) Option { + if size < 0 { + size = 0 + } + return func(bs *Server) { + bs.engineOptions = append(bs.engineOptions, decision.WithWantHaveReplaceSize(size)) + } +} + // WantlistForPeer returns the currently understood list of blocks requested by a // given peer. func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid { diff --git a/examples/go.mod b/examples/go.mod index fd77bc07e..772c389a8 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -9,7 +9,7 @@ require ( github.com/ipfs/go-datastore v0.6.0 github.com/ipld/go-car/v2 v2.13.1 github.com/ipld/go-ipld-prime v0.21.0 - github.com/libp2p/go-libp2p v0.36.3 + github.com/libp2p/go-libp2p v0.36.4 github.com/libp2p/go-libp2p-routing-helpers v0.7.3 github.com/multiformats/go-multiaddr v0.13.0 github.com/multiformats/go-multicodec v0.9.0 @@ -107,7 +107,7 @@ require ( github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect - github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect + github.com/multiformats/go-multiaddr-dns v0.4.0 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect diff --git a/examples/go.sum b/examples/go.sum index f8d2600ad..d212ad06c 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -271,8 +271,8 @@ github.com/libp2p/go-doh-resolver v0.4.0 h1:gUBa1f1XsPwtpE1du0O+nnZCUqtG7oYi7Bb+ github.com/libp2p/go-doh-resolver v0.4.0/go.mod h1:v1/jwsFusgsWIGX/c6vCRrnJ60x7bhTiq/fs2qt0cAg= github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= -github.com/libp2p/go-libp2p v0.36.3 h1:NHz30+G7D8Y8YmznrVZZla0ofVANrvBl2c+oARfMeDQ= -github.com/libp2p/go-libp2p v0.36.3/go.mod h1:4Y5vFyCUiJuluEPmpnKYf6WFx5ViKPUYs/ixe9ANFZ8= +github.com/libp2p/go-libp2p v0.36.4 h1:ZaKyKSHBFbzs6CnAYMhaMc5QgV1UoCN+9WXrg8SEwI4= +github.com/libp2p/go-libp2p v0.36.4/go.mod h1:4Y5vFyCUiJuluEPmpnKYf6WFx5ViKPUYs/ixe9ANFZ8= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= github.com/libp2p/go-libp2p-kad-dht v0.25.2 h1:FOIk9gHoe4YRWXTu8SY9Z1d0RILol0TrtApsMDPjAVQ= @@ -331,8 +331,8 @@ github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y9 github.com/multiformats/go-multiaddr v0.13.0 h1:BCBzs61E3AGHcYYTv8dqRH43ZfyrqM8RXVPT8t13tLQ= github.com/multiformats/go-multiaddr v0.13.0/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII= github.com/multiformats/go-multiaddr-dns v0.3.0/go.mod h1:mNzQ4eTGDg0ll1N9jKPOUogZPoJ30W8a7zk66FQPpdQ= -github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A= -github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk= +github.com/multiformats/go-multiaddr-dns v0.4.0 h1:P76EJ3qzBXpUXZ3twdCDx/kvagMsNo0LMFXpyms/zgU= +github.com/multiformats/go-multiaddr-dns v0.4.0/go.mod h1:7hfthtB4E4pQwirrz+J0CcDUfbWzTqEzVyYKKIKpgkc= github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= diff --git a/go.mod b/go.mod index a324e8b71..4d4653804 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-doh-resolver v0.4.0 - github.com/libp2p/go-libp2p v0.36.3 + github.com/libp2p/go-libp2p v0.36.4 github.com/libp2p/go-libp2p-kad-dht v0.25.2 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.7.3 @@ -48,7 +48,7 @@ require ( github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-base32 v0.1.0 github.com/multiformats/go-multiaddr v0.13.0 - github.com/multiformats/go-multiaddr-dns v0.3.1 + github.com/multiformats/go-multiaddr-dns v0.4.0 github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 diff --git a/go.sum b/go.sum index b68267e3f..da17cedae 100644 --- a/go.sum +++ b/go.sum @@ -274,8 +274,8 @@ github.com/libp2p/go-doh-resolver v0.4.0 h1:gUBa1f1XsPwtpE1du0O+nnZCUqtG7oYi7Bb+ github.com/libp2p/go-doh-resolver v0.4.0/go.mod h1:v1/jwsFusgsWIGX/c6vCRrnJ60x7bhTiq/fs2qt0cAg= github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= -github.com/libp2p/go-libp2p v0.36.3 h1:NHz30+G7D8Y8YmznrVZZla0ofVANrvBl2c+oARfMeDQ= -github.com/libp2p/go-libp2p v0.36.3/go.mod h1:4Y5vFyCUiJuluEPmpnKYf6WFx5ViKPUYs/ixe9ANFZ8= +github.com/libp2p/go-libp2p v0.36.4 h1:ZaKyKSHBFbzs6CnAYMhaMc5QgV1UoCN+9WXrg8SEwI4= +github.com/libp2p/go-libp2p v0.36.4/go.mod h1:4Y5vFyCUiJuluEPmpnKYf6WFx5ViKPUYs/ixe9ANFZ8= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= github.com/libp2p/go-libp2p-kad-dht v0.25.2 h1:FOIk9gHoe4YRWXTu8SY9Z1d0RILol0TrtApsMDPjAVQ= @@ -334,8 +334,8 @@ github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y9 github.com/multiformats/go-multiaddr v0.13.0 h1:BCBzs61E3AGHcYYTv8dqRH43ZfyrqM8RXVPT8t13tLQ= github.com/multiformats/go-multiaddr v0.13.0/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII= github.com/multiformats/go-multiaddr-dns v0.3.0/go.mod h1:mNzQ4eTGDg0ll1N9jKPOUogZPoJ30W8a7zk66FQPpdQ= -github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A= -github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk= +github.com/multiformats/go-multiaddr-dns v0.4.0 h1:P76EJ3qzBXpUXZ3twdCDx/kvagMsNo0LMFXpyms/zgU= +github.com/multiformats/go-multiaddr-dns v0.4.0/go.mod h1:7hfthtB4E4pQwirrz+J0CcDUfbWzTqEzVyYKKIKpgkc= github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= diff --git a/ipld/unixfs/hamt/hamt.go b/ipld/unixfs/hamt/hamt.go index a57ddad41..455d070c6 100644 --- a/ipld/unixfs/hamt/hamt.go +++ b/ipld/unixfs/hamt/hamt.go @@ -29,8 +29,6 @@ import ( "os" "sync" - "golang.org/x/sync/errgroup" - format "github.com/ipfs/boxo/ipld/unixfs" "github.com/ipfs/boxo/ipld/unixfs/internal" @@ -38,8 +36,12 @@ import ( bitfield "github.com/ipfs/go-bitfield" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/sync/errgroup" ) +var log = logging.Logger("unixfs-hamt") + const ( // HashMurmur3 is the multiformats identifier for Murmur3 HashMurmur3 uint64 = 0x22 @@ -430,8 +432,13 @@ type listCidsAndShards struct { func (ds *Shard) walkChildren(processLinkValues func(formattedLink *ipld.Link) error) (*listCidsAndShards, error) { res := &listCidsAndShards{} - for idx, lnk := range ds.childer.links { - if nextShard := ds.childer.children[idx]; nextShard == nil { + for i, nextShard := range ds.childer.children { + if nextShard == nil { + lnk := ds.childer.link(i) + if lnk == nil { + log.Warnf("internal HAMT error: both link and shard nil at pos %d, dumping shard: %+v", i, *ds) + return nil, fmt.Errorf("internal HAMT error: both link and shard nil, check log") + } lnkLinkType, err := ds.childLinkType(lnk) if err != nil { return nil, err @@ -454,7 +461,6 @@ func (ds *Shard) walkChildren(processLinkValues func(formattedLink *ipld.Link) e default: return nil, errors.New("unsupported shard link type") } - } else { if nextShard.val != nil { formattedLink := &ipld.Link{ diff --git a/ipld/unixfs/hamt/hamt_test.go b/ipld/unixfs/hamt/hamt_test.go index 1946bbd7c..16b325773 100644 --- a/ipld/unixfs/hamt/hamt_test.go +++ b/ipld/unixfs/hamt/hamt_test.go @@ -749,3 +749,24 @@ func TestHamtBadSize(t *testing.T) { } } } + +func TestHamtNilLinkAndShard(t *testing.T) { + shard, err := NewShard(nil, 1024) + if err != nil { + t.Fatal(err) + } + shard.childer = shard.childer.makeChilder(nil, []*ipld.Link{nil}) + nextShard, err := shard.walkChildren(func(_ *ipld.Link) error { + t.Fatal("processLinkValues function should not have been called") + return nil + }) + if err == nil { + t.Fatal("expected error") + } + if err.Error() != "internal HAMT error: both link and shard nil, check log" { + t.Fatal("did not get expected error") + } + if nextShard != nil { + t.Fatal("nextShard should be nil") + } +} diff --git a/routing/http/client/client.go b/routing/http/client/client.go index 16840cab5..9b85a5066 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -9,12 +9,15 @@ import ( "io" "mime" "net/http" + gourl "net/url" + "sort" "strings" "time" "github.com/benbjohnson/clock" ipns "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/routing/http/contentrouter" + "github.com/ipfs/boxo/routing/http/filters" "github.com/ipfs/boxo/routing/http/internal/drjson" "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" @@ -52,6 +55,11 @@ type Client struct { // for testing, e.g., testing the server with a mangled signature. //lint:ignore SA1019 // ignore staticcheck afterSignCallback func(req *types.WriteBitswapRecord) + + // disableLocalFiltering is used to disable local filtering of the results + disableLocalFiltering bool + protocolFilter []string + addrFilter []string } // defaultUserAgent is used as a fallback to inform HTTP server which library @@ -83,6 +91,37 @@ func WithIdentity(identity crypto.PrivKey) Option { } } +// WithDisabledLocalFiltering disables local filtering of the results. +// This should be used for delegated routing servers that already implement filtering +func WithDisabledLocalFiltering(val bool) Option { + return func(c *Client) error { + c.disableLocalFiltering = val + return nil + } +} + +// WithProtocolFilter adds a protocol filter to the client. +// The protocol filter is added to the request URL. +// The protocols are ordered alphabetically for cache key (url) consistency +func WithProtocolFilter(protocolFilter []string) Option { + return func(c *Client) error { + sort.Strings(protocolFilter) + c.protocolFilter = protocolFilter + return nil + } +} + +// WithAddrFilter adds an address filter to the client. +// The address filter is added to the request URL. +// The addresses are ordered alphabetically for cache key (url) consistency +func WithAddrFilter(addrFilter []string) Option { + return func(c *Client) error { + sort.Strings(addrFilter) + c.addrFilter = addrFilter + return nil + } +} + // WithHTTPClient sets a custom HTTP Client to be used with [Client]. func WithHTTPClient(h httpClient) Option { return func(c *Client) error { @@ -184,7 +223,12 @@ func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter // TODO test measurements m := newMeasurement("FindProviders") - url := c.baseURL + "/routing/v1/providers/" + key.String() + url, err := gourl.JoinPath(c.baseURL, "routing/v1/providers", key.String()) + if err != nil { + return nil, err + } + url = filters.AddFiltersToURL(url, c.protocolFilter, c.addrFilter) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err @@ -251,6 +295,10 @@ func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter return nil, errors.New("unknown content type") } + if !c.disableLocalFiltering { + it = filters.ApplyFiltersToIter(it, c.addrFilter, c.protocolFilter) + } + return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil } @@ -356,7 +404,12 @@ func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) { m := newMeasurement("FindPeers") - url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String() + url, err := gourl.JoinPath(c.baseURL, "routing/v1/peers", peer.ToCid(pid).String()) + if err != nil { + return nil, err + } + url = filters.AddFiltersToURL(url, c.protocolFilter, c.addrFilter) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err @@ -423,6 +476,10 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI return nil, errors.New("unknown content type") } + if !c.disableLocalFiltering { + it = filters.ApplyFiltersToPeerRecordIter(it, c.addrFilter, c.protocolFilter) + } + return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil } diff --git a/routing/http/client/client_test.go b/routing/http/client/client_test.go index 590deed11..0b5dc29f7 100644 --- a/routing/http/client/client_test.go +++ b/routing/http/client/client_test.go @@ -49,7 +49,8 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) - return args.Get(0).(*ipns.Record), args.Error(1) + rec, _ := args.Get(0).(*ipns.Record) + return rec, args.Error(1) } func (m *mockContentRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { @@ -158,12 +159,12 @@ func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) { return } -func makePeerRecord() types.PeerRecord { +func makePeerRecord(protocols []string) types.PeerRecord { peerID, addrs, _ := makeProviderAndIdentity() return types.PeerRecord{ Schema: types.SchemaPeer, ID: &peerID, - Protocols: []string{"transport-bitswap"}, + Protocols: protocols, Addrs: addrsToDRAddrs(addrs), Extra: map[string]json.RawMessage{}, } @@ -196,7 +197,7 @@ func makeProviderAndIdentity() (peer.ID, []multiaddr.Multiaddr, crypto.PrivKey) panic(err) } - ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4002") + ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/udp/4002") if err != nil { panic(err) } @@ -222,15 +223,15 @@ func (e *osErrContains) errContains(t *testing.T, err error) { } func TestClient_FindProviders(t *testing.T) { - peerRecord := makePeerRecord() + bitswapPeerRecord := makePeerRecord([]string{"transport-bitswap"}) + httpPeerRecord := makePeerRecord([]string{"transport-ipfs-gateway-http"}) peerProviders := []iter.Result[types.Record]{ - {Val: &peerRecord}, + {Val: &bitswapPeerRecord}, + {Val: &httpPeerRecord}, } bitswapRecord := makeBitswapRecord() - bitswapProviders := []iter.Result[types.Record]{ - {Val: &bitswapRecord}, - } + peerRecordFromBitswapRecord := types.FromBitswapRecord(&bitswapRecord) cases := []struct { name string @@ -240,6 +241,7 @@ func TestClient_FindProviders(t *testing.T) { routerErr error clientRequiresStreaming bool serverStreamingDisabled bool + filterProtocols []string expErrContains osErrContains expResult []iter.Result[types.Record] @@ -252,10 +254,17 @@ func TestClient_FindProviders(t *testing.T) { expResult: peerProviders, expStreamingResponse: true, }, + { + name: "happy case with protocol filter", + filterProtocols: []string{"transport-bitswap"}, + routerResult: peerProviders, + expResult: []iter.Result[types.Record]{{Val: &bitswapPeerRecord}}, + expStreamingResponse: true, + }, { name: "happy case (with deprecated bitswap schema)", - routerResult: bitswapProviders, - expResult: bitswapProviders, + routerResult: []iter.Result[types.Record]{{Val: &bitswapRecord}}, + expResult: []iter.Result[types.Record]{{Val: peerRecordFromBitswapRecord}}, expStreamingResponse: true, }, { @@ -307,6 +316,10 @@ func TestClient_FindProviders(t *testing.T) { }) } + if c.filterProtocols != nil { + clientOpts = append(clientOpts, WithProtocolFilter(c.filterProtocols)) + } + if c.expStreamingResponse { onRespReceived = append(onRespReceived, func(r *http.Response) { assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type")) @@ -484,11 +497,13 @@ func TestClient_Provide(t *testing.T) { } func TestClient_FindPeers(t *testing.T) { - peerRecord := makePeerRecord() + peerRecord1 := makePeerRecord([]string{"transport-bitswap"}) + peerRecord2 := makePeerRecord([]string{"transport-ipfs-gateway-http"}) peerRecords := []iter.Result[*types.PeerRecord]{ - {Val: &peerRecord}, + {Val: &peerRecord1}, + {Val: &peerRecord2}, } - pid := *peerRecord.ID + pid := *peerRecord1.ID cases := []struct { name string @@ -498,6 +513,7 @@ func TestClient_FindPeers(t *testing.T) { routerErr error clientRequiresStreaming bool serverStreamingDisabled bool + filterProtocols []string expErrContains osErrContains expResult []iter.Result[*types.PeerRecord] @@ -510,6 +526,13 @@ func TestClient_FindPeers(t *testing.T) { expResult: peerRecords, expStreamingResponse: true, }, + { + name: "happy case with protocol filter", + filterProtocols: []string{"transport-bitswap"}, + routerResult: peerRecords, + expResult: []iter.Result[*types.PeerRecord]{{Val: &peerRecord1}}, + expStreamingResponse: true, + }, { name: "server doesn't support streaming", routerResult: peerRecords, @@ -544,12 +567,10 @@ func TestClient_FindPeers(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - var ( - clientOpts []Option - serverOpts []server.Option - onRespReceived []func(*http.Response) - onReqReceived []func(*http.Request) - ) + var clientOpts []Option + var serverOpts []server.Option + var onRespReceived []func(*http.Response) + var onReqReceived []func(*http.Request) if c.serverStreamingDisabled { serverOpts = append(serverOpts, server.WithStreamingResultsDisabled()) @@ -562,6 +583,10 @@ func TestClient_FindPeers(t *testing.T) { }) } + if c.filterProtocols != nil { + clientOpts = append(clientOpts, WithProtocolFilter(c.filterProtocols)) + } + if c.expStreamingResponse { onRespReceived = append(onRespReceived, func(r *http.Response) { assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type")) @@ -605,7 +630,7 @@ func TestClient_FindPeers(t *testing.T) { resultIter, err := client.FindPeers(ctx, pid) c.expErrContains.errContains(t, err) - results := iter.ReadAll[iter.Result[*types.PeerRecord]](resultIter) + results := iter.ReadAll(resultIter) assert.Equal(t, c.expResult, results) }) } diff --git a/routing/http/filters/filters.go b/routing/http/filters/filters.go new file mode 100644 index 000000000..ae7aad18f --- /dev/null +++ b/routing/http/filters/filters.go @@ -0,0 +1,245 @@ +package filters + +import ( + "net/url" + "reflect" + "slices" + "strings" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" + logging "github.com/ipfs/go-log/v2" + "github.com/multiformats/go-multiaddr" +) + +var logger = logging.Logger("routing/http/filters") + +// Package filters implements IPIP-0484 + +func ParseFilter(param string) []string { + if param == "" { + return nil + } + return strings.Split(strings.ToLower(param), ",") +} + +func AddFiltersToURL(baseURL string, protocolFilter, addrFilter []string) string { + parsedURL, err := url.Parse(baseURL) + if err != nil { + return baseURL + } + + query := parsedURL.Query() + + if len(protocolFilter) > 0 { + query.Set("filter-protocols", strings.Join(protocolFilter, ",")) + } + + if len(addrFilter) > 0 { + query.Set("filter-addrs", strings.Join(addrFilter, ",")) + } + + parsedURL.RawQuery = query.Encode() + return parsedURL.String() +} + +// applyFiltersToIter applies the filters to the given iterator and returns a new iterator. +// +// The function iterates over the input iterator, applying the specified filters to each record. +// It supports both positive and negative filters for both addresses and protocols. +// +// Parameters: +// - recordsIter: An iterator of types.Record to be filtered. +// - filterAddrs: A slice of strings representing the address filter criteria. +// - filterProtocols: A slice of strings representing the protocol filter criteria. +func ApplyFiltersToIter(recordsIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) iter.ResultIter[types.Record] { + mappedIter := iter.Map(recordsIter, func(v iter.Result[types.Record]) iter.Result[types.Record] { + if v.Err != nil || v.Val == nil { + return v + } + + switch v.Val.GetSchema() { + case types.SchemaPeer: + record, ok := v.Val.(*types.PeerRecord) + if !ok { + logger.Errorw("problem casting find providers record", "Schema", v.Val.GetSchema(), "Type", reflect.TypeOf(v).String()) + // drop failed type assertion + return iter.Result[types.Record]{} + } + + record = applyFilters(record, filterAddrs, filterProtocols) + if record == nil { + return iter.Result[types.Record]{} + } + v.Val = record + + //lint:ignore SA1019 // ignore staticcheck + case types.SchemaBitswap: + //lint:ignore SA1019 // ignore staticcheck + record, ok := v.Val.(*types.BitswapRecord) + if !ok { + logger.Errorw("problem casting find providers record", "Schema", v.Val.GetSchema(), "Type", reflect.TypeOf(v).String()) + // drop failed type assertion + return iter.Result[types.Record]{} + } + peerRecord := types.FromBitswapRecord(record) + peerRecord = applyFilters(peerRecord, filterAddrs, filterProtocols) + if peerRecord == nil { + return iter.Result[types.Record]{} + } + v.Val = peerRecord + } + return v + }) + + // filter out nil results and errors + filteredIter := iter.Filter(mappedIter, func(v iter.Result[types.Record]) bool { + return v.Err == nil && v.Val != nil + }) + + return filteredIter +} + +func ApplyFiltersToPeerRecordIter(peerRecordIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) iter.ResultIter[*types.PeerRecord] { + // Convert PeerRecord to Record so that we can reuse the filtering logic from findProviders + mappedIter := iter.Map(peerRecordIter, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] { + if v.Err != nil || v.Val == nil { + return iter.Result[types.Record]{Err: v.Err} + } + + var record types.Record = v.Val + return iter.Result[types.Record]{Val: record} + }) + + filteredIter := ApplyFiltersToIter(mappedIter, filterAddrs, filterProtocols) + + // Convert Record back to PeerRecord 🙃 + return iter.Map(filteredIter, func(v iter.Result[types.Record]) iter.Result[*types.PeerRecord] { + if v.Err != nil || v.Val == nil { + return iter.Result[*types.PeerRecord]{Err: v.Err} + } + + var record *types.PeerRecord = v.Val.(*types.PeerRecord) + return iter.Result[*types.PeerRecord]{Val: record} + }) +} + +// Applies the filters. Returns nil if the provider does not pass the protocols filter +// The address filter is more complicated because it potentially modifies the Addrs slice. +func applyFilters(provider *types.PeerRecord, filterAddrs, filterProtocols []string) *types.PeerRecord { + if len(filterAddrs) == 0 && len(filterProtocols) == 0 { + return provider + } + + if !protocolsAllowed(provider.Protocols, filterProtocols) { + // If the provider doesn't match any of the passed protocols, the provider is omitted from the response. + return nil + } + + // return untouched if there's no filter or filterAddrsQuery contains "unknown" and provider has no addrs + if len(filterAddrs) == 0 || (len(provider.Addrs) == 0 && slices.Contains(filterAddrs, "unknown")) { + return provider + } + + filteredAddrs := applyAddrFilter(provider.Addrs, filterAddrs) + + // If filtering resulted in no addrs, omit the provider + if len(filteredAddrs) == 0 { + return nil + } + + provider.Addrs = filteredAddrs + return provider +} + +// applyAddrFilter filters a list of multiaddresses based on the provided filter query. +// +// Parameters: +// - addrs: A slice of types.Multiaddr to be filtered. +// - filterAddrsQuery: A slice of strings representing the filter criteria. +// +// The function supports both positive and negative filters: +// - Positive filters (e.g., "tcp", "udp") include addresses that match the specified protocols. +// - Negative filters (e.g., "!tcp", "!udp") exclude addresses that match the specified protocols. +// +// If no filters are provided, the original list of addresses is returned unchanged. +// If only negative filters are provided, addresses not matching any negative filter are included. +// If positive filters are provided, only addresses matching at least one positive filter (and no negative filters) are included. +// If both positive and negative filters are provided, the address must match at least one positive filter and no negative filters to be included. +// +// Returns: +// A new slice of types.Multiaddr containing only the addresses that pass the filter criteria. +func applyAddrFilter(addrs []types.Multiaddr, filterAddrsQuery []string) []types.Multiaddr { + if len(filterAddrsQuery) == 0 { + return addrs + } + + var filteredAddrs []types.Multiaddr + var positiveFilters, negativeFilters []multiaddr.Protocol + + // Separate positive and negative filters + for _, filter := range filterAddrsQuery { + if strings.HasPrefix(filter, "!") { + negativeFilters = append(negativeFilters, multiaddr.ProtocolWithName(filter[1:])) + } else { + positiveFilters = append(positiveFilters, multiaddr.ProtocolWithName(filter)) + } + } + + for _, addr := range addrs { + protocols := addr.Protocols() + + // Check negative filters + if containsAny(protocols, negativeFilters) { + continue + } + + // If no positive filters or matches a positive filter, include the address + if len(positiveFilters) == 0 || containsAny(protocols, positiveFilters) { + filteredAddrs = append(filteredAddrs, addr) + } + } + + return filteredAddrs +} + +// Helper function to check if protocols contain any of the filters +func containsAny(protocols []multiaddr.Protocol, filters []multiaddr.Protocol) bool { + for _, filter := range filters { + if containsProtocol(protocols, filter) { + return true + } + } + return false +} + +func containsProtocol(protos []multiaddr.Protocol, proto multiaddr.Protocol) bool { + for _, p := range protos { + if p.Code == proto.Code { + return true + } + } + return false +} + +// protocolsAllowed returns true if the peerProtocols are allowed by the filter protocols. +func protocolsAllowed(peerProtocols []string, filterProtocols []string) bool { + if len(filterProtocols) == 0 { + // If no filter is passed, do not filter + return true + } + + for _, filterProtocol := range filterProtocols { + if filterProtocol == "unknown" && len(peerProtocols) == 0 { + return true + } + + for _, peerProtocol := range peerProtocols { + if strings.EqualFold(peerProtocol, filterProtocol) { + return true + } + + } + } + return false +} diff --git a/routing/http/filters/filters_test.go b/routing/http/filters/filters_test.go new file mode 100644 index 000000000..ac6219bd7 --- /dev/null +++ b/routing/http/filters/filters_test.go @@ -0,0 +1,379 @@ +package filters + +import ( + "testing" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAddFiltersToURL(t *testing.T) { + testCases := []struct { + name string + baseURL string + protocolFilter []string + addrFilter []string + expected string + }{ + { + name: "No filters", + baseURL: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", + protocolFilter: nil, + addrFilter: nil, + expected: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", + }, + { + name: "Only protocol filter", + baseURL: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", + protocolFilter: []string{"transport-bitswap", "transport-ipfs-gateway-http"}, + addrFilter: nil, + expected: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?filter-protocols=transport-bitswap%2Ctransport-ipfs-gateway-http", + }, + { + name: "Only addr filter", + baseURL: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", + protocolFilter: nil, + addrFilter: []string{"ip4", "ip6"}, + expected: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?filter-addrs=ip4%2Cip6", + }, + { + name: "Both filters", + baseURL: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", + protocolFilter: []string{"transport-bitswap", "transport-graphsync-filecoinv1"}, + addrFilter: []string{"ip4", "ip6"}, + expected: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?filter-addrs=ip4%2Cip6&filter-protocols=transport-bitswap%2Ctransport-graphsync-filecoinv1", + }, + { + name: "URL with existing query parameters", + baseURL: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?existing=param", + protocolFilter: []string{"transport-bitswap"}, + addrFilter: []string{"ip4"}, + expected: "https://example.com/routing/v1/providers/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?existing=param&filter-addrs=ip4&filter-protocols=transport-bitswap", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := AddFiltersToURL(tc.baseURL, tc.protocolFilter, tc.addrFilter) + assert.Equal(t, tc.expected, result) + }) + } +} + +func TestApplyAddrFilter(t *testing.T) { + // Create some test multiaddrs + addr1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + addr2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/udp/4001/quic/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + addr3, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/ws/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + addr4, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/tcp/4001/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + addr5, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/udp/4001/quic-v1/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + addr6, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/certhash/uEiD9f05PrY82lovP4gOFonmY7sO0E7_jyovt9p2LEcAS-Q/certhash/uEiBtGJsNz-PcywwXOVzEYeQQloQiHMqDqdj18t2Fe4GTLQ/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + addr7, _ := multiaddr.NewMultiaddr("/dns4/ny5.bootstrap.libp2p.io/tcp/443/wss/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + addr8, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/udp/4001/quic-v1/webtransport/certhash/uEiAMrMcVWFNiqtSeRXZTwHTac4p9WcGh5hg8kVBzTC1JTA/certhash/uEiA4dfvbbbnBIYalhp1OpW1Bk-nuWIKSy21ol6vPea67Cw/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt") + + addrs := []types.Multiaddr{ + {Multiaddr: addr1}, + {Multiaddr: addr2}, + {Multiaddr: addr3}, + {Multiaddr: addr4}, + {Multiaddr: addr5}, + {Multiaddr: addr6}, + {Multiaddr: addr7}, + {Multiaddr: addr8}, + } + + testCases := []struct { + name string + filterAddrs []string + expectedAddrs []types.Multiaddr + }{ + { + name: "No filter", + filterAddrs: []string{}, + expectedAddrs: addrs, + }, + { + name: "Filter TCP", + filterAddrs: []string{"tcp"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr1}, {Multiaddr: addr3}, {Multiaddr: addr4}, {Multiaddr: addr7}}, + }, + { + name: "Filter UDP", + filterAddrs: []string{"udp"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr2}, {Multiaddr: addr5}, {Multiaddr: addr6}, {Multiaddr: addr8}}, + }, + { + name: "Filter WebSocket", + filterAddrs: []string{"ws"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr3}}, + }, + { + name: "Exclude TCP", + filterAddrs: []string{"!tcp"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr2}, {Multiaddr: addr5}, {Multiaddr: addr6}, {Multiaddr: addr8}}, + }, + { + name: "Filter TCP addresses that don't have WebSocket and p2p-circuit", + filterAddrs: []string{"tcp", "!ws", "!wss", "!p2p-circuit"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr1}}, + }, + { + name: "Include WebTransport and exclude p2p-circuit", + filterAddrs: []string{"webtransport", "!p2p-circuit"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr8}}, + }, + { + name: "empty for unknown protocol nae", + filterAddrs: []string{"fakeproto"}, + expectedAddrs: []types.Multiaddr{}, + }, + { + name: "Include WebTransport but ignore unknown protocol name", + filterAddrs: []string{"webtransport", "fakeproto"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr6}, {Multiaddr: addr8}}, + }, + { + name: "Multiple filters", + filterAddrs: []string{"tcp", "ws"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr1}, {Multiaddr: addr3}, {Multiaddr: addr4}, {Multiaddr: addr7}}, + }, + { + name: "Multiple negative filters", + filterAddrs: []string{"!tcp", "!ws"}, + expectedAddrs: []types.Multiaddr{{Multiaddr: addr2}, {Multiaddr: addr5}, {Multiaddr: addr6}, {Multiaddr: addr8}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := applyAddrFilter(addrs, tc.filterAddrs) + assert.Equal(t, len(tc.expectedAddrs), len(result), "Unexpected number of addresses after filtering") + + // Check that each expected address is in the result + for _, expectedAddr := range tc.expectedAddrs { + found := false + for _, resultAddr := range result { + if expectedAddr.Multiaddr.Equal(resultAddr.Multiaddr) { + found = true + break + } + } + assert.True(t, found, "Expected address not found in test %s result: %s", tc.name, expectedAddr.Multiaddr) + } + + // Check that each result address is in the expected list + for _, resultAddr := range result { + found := false + for _, expectedAddr := range tc.expectedAddrs { + if resultAddr.Multiaddr.Equal(expectedAddr.Multiaddr) { + found = true + break + } + } + assert.True(t, found, "Unexpected address found in test %s result: %s", tc.name, resultAddr.Multiaddr) + } + }) + } +} + +func TestProtocolsAllowed(t *testing.T) { + testCases := []struct { + name string + peerProtocols []string + filterProtocols []string + expected bool + }{ + { + name: "No filter", + peerProtocols: []string{"transport-bitswap", "transport-ipfs-gateway-http"}, + filterProtocols: []string{}, + expected: true, + }, + { + name: "Single matching protocol", + peerProtocols: []string{"transport-bitswap", "transport-ipfs-gateway-http"}, + filterProtocols: []string{"transport-bitswap"}, + expected: true, + }, + { + name: "Single non-matching protocol", + peerProtocols: []string{"transport-bitswap", "transport-ipfs-gateway-http"}, + filterProtocols: []string{"transport-graphsync-filecoinv1"}, + expected: false, + }, + { + name: "Multiple protocols, one match", + peerProtocols: []string{"transport-bitswap", "transport-ipfs-gateway-http"}, + filterProtocols: []string{"transport-graphsync-filecoinv1", "transport-ipfs-gateway-http"}, + expected: true, + }, + { + name: "Unknown protocol for empty peer protocols", + peerProtocols: []string{}, + filterProtocols: []string{"unknown"}, + expected: true, + }, + { + name: "Unknown protocol for non-empty peer protocols", + peerProtocols: []string{"transport-bitswap"}, + filterProtocols: []string{"unknown"}, + expected: false, + }, + { + name: "Unknown or specific protocol for matching non-empty peer protocols", + peerProtocols: []string{"transport-bitswap"}, + filterProtocols: []string{"unknown", "transport-bitswap", "transport-ipfs-gateway-http"}, + expected: true, + }, + { + name: "Unknown or specific protocol for matching empty peer protocols", + peerProtocols: []string{}, + filterProtocols: []string{"unknown", "transport-bitswap", "transport-ipfs-gateway-http"}, + expected: true, + }, + { + name: "Unknown or specific protocol for not matching non-empty peer protocols", + peerProtocols: []string{"transport-graphsync-filecoinv1"}, + filterProtocols: []string{"unknown", "transport-bitswap", "transport-ipfs-gateway-http"}, + expected: false, + }, + { + name: "Case insensitive match", + peerProtocols: []string{"TRANSPORT-BITSWAP", "Transport-IPFS-Gateway-HTTP"}, + filterProtocols: []string{"transport-bitswap"}, + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := protocolsAllowed(tc.peerProtocols, tc.filterProtocols) + assert.Equal(t, tc.expected, result, "Unexpected result for test case: %s", tc.name) + }) + } +} + +func TestApplyFilters(t *testing.T) { + pid, err := peer.Decode("12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn") + require.NoError(t, err) + + tests := []struct { + name string + provider *types.PeerRecord + filterAddrs []string + filterProtocols []string + expected *types.PeerRecord + }{ + { + name: "No filters", + provider: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + Protocols: []string{"transport-ipfs-gateway-http"}, + }, + filterAddrs: []string{}, + filterProtocols: []string{}, + expected: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + Protocols: []string{"transport-ipfs-gateway-http"}, + }, + }, + { + name: "Protocol filter", + provider: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/127.0.0.1/tcp/4001"), + mustMultiaddr(t, "/ip4/127.0.0.1/udp/4001/quic-v1"), + mustMultiaddr(t, "/ip4/127.0.0.1/tcp/4001/ws"), + mustMultiaddr(t, "/ip4/102.101.1.1/tcp/4001/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + Protocols: []string{"transport-ipfs-gateway-http"}, + }, + filterAddrs: []string{}, + filterProtocols: []string{"transport-ipfs-gateway-http", "transport-bitswap"}, + expected: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/127.0.0.1/tcp/4001"), + mustMultiaddr(t, "/ip4/127.0.0.1/udp/4001/quic-v1"), + mustMultiaddr(t, "/ip4/127.0.0.1/tcp/4001/ws"), + mustMultiaddr(t, "/ip4/102.101.1.1/tcp/4001/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + Protocols: []string{"transport-ipfs-gateway-http"}, + }, + }, + { + name: "Address filter", + provider: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/127.0.0.1/tcp/4001"), + mustMultiaddr(t, "/ip4/127.0.0.1/udp/4001/quic-v1"), + mustMultiaddr(t, "/ip4/127.0.0.1/tcp/4001/ws"), + mustMultiaddr(t, "/ip4/127.0.0.1/udp/4001/webrtc-direct/certhash/uEiCZqN653gMqxrWNmYuNg7Emwb-wvtsuzGE3XD6rypViZA"), + mustMultiaddr(t, "/ip4/102.101.1.1/tcp/4001/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"), + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + Protocols: []string{"transport-ipfs-gateway-http"}, + }, + filterAddrs: []string{"webtransport", "wss", "webrtc-direct", "!p2p-circuit"}, + filterProtocols: []string{"transport-ipfs-gateway-http", "transport-bitswap"}, + expected: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/127.0.0.1/udp/4001/webrtc-direct/certhash/uEiCZqN653gMqxrWNmYuNg7Emwb-wvtsuzGE3XD6rypViZA"), + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + Protocols: []string{"transport-ipfs-gateway-http"}, + }, + }, + { + name: "Unknown protocol filter", + provider: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + }, + filterAddrs: []string{}, + filterProtocols: []string{"unknown"}, + expected: &types.PeerRecord{ + ID: &pid, + Addrs: []types.Multiaddr{ + mustMultiaddr(t, "/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := applyFilters(tt.provider, tt.filterAddrs, tt.filterProtocols) + assert.Equal(t, tt.expected, result) + }) + } +} + +func mustMultiaddr(t *testing.T, s string) types.Multiaddr { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + t.Fatalf("Failed to create multiaddr: %v", err) + } + return types.Multiaddr{Multiaddr: addr} +} diff --git a/routing/http/server/server.go b/routing/http/server/server.go index 1e1a84770..a7f9385b6 100644 --- a/routing/http/server/server.go +++ b/routing/http/server/server.go @@ -15,6 +15,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/gorilla/mux" "github.com/ipfs/boxo/ipns" + "github.com/ipfs/boxo/routing/http/filters" "github.com/ipfs/boxo/routing/http/internal/drjson" "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" @@ -194,6 +195,11 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) { return } + // Parse query parameters + query := httpReq.URL.Query() + filterAddrs := filters.ParseFilter(query.Get("filter-addrs")) + filterProtocols := filters.ParseFilter(query.Get("filter-protocols")) + mediaType, err := s.detectResponseType(httpReq) if err != nil { writeErr(w, "FindProviders", http.StatusBadRequest, err) @@ -201,7 +207,7 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) { } var ( - handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) + handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) recordsLimit int ) @@ -224,13 +230,14 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) { } } - handlerFunc(w, provIter) + handlerFunc(w, provIter, filterAddrs, filterProtocols) } -func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) { +func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) { defer provIter.Close() - providers, err := iter.ReadAllResults(provIter) + filteredIter := filters.ApplyFiltersToIter(provIter, filterAddrs, filterProtocols) + providers, err := iter.ReadAllResults(filteredIter) if err != nil { writeErr(w, "FindProviders", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) return @@ -240,9 +247,10 @@ func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIt Providers: providers, }) } +func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) { + filteredIter := filters.ApplyFiltersToIter(provIter, filterAddrs, filterProtocols) -func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) { - writeResultsIterNDJSON(w, provIter) + writeResultsIterNDJSON(w, filteredIter) } func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { @@ -277,6 +285,10 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { return } + query := r.URL.Query() + filterAddrs := filters.ParseFilter(query.Get("filter-addrs")) + filterProtocols := filters.ParseFilter(query.Get("filter-protocols")) + mediaType, err := s.detectResponseType(r) if err != nil { writeErr(w, "FindPeers", http.StatusBadRequest, err) @@ -284,7 +296,7 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { } var ( - handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord]) + handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) recordsLimit int ) @@ -307,7 +319,7 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { } } - handlerFunc(w, provIter) + handlerFunc(w, provIter, filterAddrs, filterProtocols) } func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) { @@ -369,10 +381,13 @@ func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) { writeJSONResult(w, "Provide", resp) } -func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) { +func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) { defer peersIter.Close() + peersIter = filters.ApplyFiltersToPeerRecordIter(peersIter, filterAddrs, filterProtocols) + peers, err := iter.ReadAllResults(peersIter) + if err != nil { writeErr(w, "FindPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) return @@ -383,8 +398,19 @@ func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[ }) } -func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) { - writeResultsIterNDJSON(w, peersIter) +func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) { + // Convert PeerRecord to Record so that we can reuse the filtering logic from findProviders + mappedIter := iter.Map(peersIter, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] { + if v.Err != nil || v.Val == nil { + return iter.Result[types.Record]{Err: v.Err} + } + + var record types.Record = v.Val + return iter.Result[types.Record]{Val: record} + }) + + filteredIter := filters.ApplyFiltersToIter(mappedIter, filterAddrs, filterProtocols) + writeResultsIterNDJSON(w, filteredIter) } func (s *server) GetIPNS(w http.ResponseWriter, r *http.Request) { @@ -572,7 +598,7 @@ func logErr(method, msg string, err error) { logger.Infow(msg, "Method", method, "Error", err) } -func writeResultsIterNDJSON[T any](w http.ResponseWriter, resultIter iter.ResultIter[T]) { +func writeResultsIterNDJSON[T types.Record](w http.ResponseWriter, resultIter iter.ResultIter[T]) { defer resultIter.Close() w.Header().Set("Content-Type", mediaTypeNDJSON) diff --git a/routing/http/server/server_test.go b/routing/http/server/server_test.go index 3f4e7906a..bf84e4155 100644 --- a/routing/http/server/server_test.go +++ b/routing/http/server/server_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "fmt" "io" "net/http" "net/http/httptest" @@ -15,6 +16,7 @@ import ( "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/path" + "github.com/ipfs/boxo/routing/http/filters" "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" @@ -22,6 +24,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" b58 "github.com/mr-tron/base58/base58" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -93,6 +96,13 @@ func TestProviders(t *testing.T) { pid2Str := "12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz" cidStr := "bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4" + addr1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + addr2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/udp/4001/quic-v1") + addr3, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/ws") + addr4, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/tcp/4001/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit") + addr5, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit") + addr6, _ := multiaddr.NewMultiaddr("/ip4/8.8.8.8/udp/4001/quic-v1/webtransport") + pid, err := peer.Decode(pidStr) require.NoError(t, err) pid2, err := peer.Decode(pid2Str) @@ -101,7 +111,7 @@ func TestProviders(t *testing.T) { cid, err := cid.Decode(cidStr) require.NoError(t, err) - runTest := func(t *testing.T, contentType string, empty bool, expectedStream bool, expectedBody string) { + runTest := func(t *testing.T, contentType string, filterAddrs, filterProtocols string, empty bool, expectedStream bool, expectedBody string) { t.Parallel() var results *iter.SliceIter[iter.Result[types.Record]] @@ -114,16 +124,22 @@ func TestProviders(t *testing.T) { Schema: types.SchemaPeer, ID: &pid, Protocols: []string{"transport-bitswap"}, + Addrs: []types.Multiaddr{ + {Multiaddr: addr1}, + {Multiaddr: addr2}, + {Multiaddr: addr3}, + {Multiaddr: addr4}, + {Multiaddr: addr5}, + {Multiaddr: addr6}, + }, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid2, + Protocols: []string{"transport-ipfs-gateway-http"}, Addrs: []types.Multiaddr{}, }}, - //lint:ignore SA1019 // ignore staticcheck - {Val: &types.BitswapRecord{ - //lint:ignore SA1019 // ignore staticcheck - Schema: types.SchemaBitswap, - ID: &pid2, - Protocol: "transport-bitswap", - Addrs: []types.Multiaddr{}, - }}}, + }, ) } @@ -136,7 +152,9 @@ func TestProviders(t *testing.T) { limit = DefaultStreamingRecordsLimit } router.On("FindProviders", mock.Anything, cid, limit).Return(results, nil) - urlStr := serverAddr + "/routing/v1/providers/" + cidStr + + urlStr := fmt.Sprintf("%s/routing/v1/providers/%s", serverAddr, cidStr) + urlStr = filters.AddFiltersToURL(urlStr, strings.Split(filterProtocols, ","), strings.Split(filterAddrs, ",")) req, err := http.NewRequest(http.MethodGet, urlStr, nil) require.NoError(t, err) @@ -174,29 +192,55 @@ func TestProviders(t *testing.T) { } t.Run("JSON Response", func(t *testing.T) { - runTest(t, mediaTypeJSON, false, false, `{"Providers":[{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"},{"Schema":"bitswap","Protocol":"transport-bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz"}]}`) + runTest(t, mediaTypeJSON, "", "", false, false, `{"Providers":[{"Addrs":["/ip4/127.0.0.1/tcp/4001","/ip4/127.0.0.1/udp/4001/quic-v1","/ip4/127.0.0.1/tcp/4001/ws","/ip4/102.101.1.1/tcp/4001/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit","/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit","/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"},{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-ipfs-gateway-http"],"Schema":"peer"}]}`) + }) + + t.Run("JSON Response with addr filtering including unknown", func(t *testing.T) { + runTest(t, mediaTypeJSON, "webtransport,!p2p-circuit,unknown", "", false, false, `{"Providers":[{"Addrs":["/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"},{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-ipfs-gateway-http"],"Schema":"peer"}]}`) + }) + + t.Run("JSON Response with addr filtering", func(t *testing.T) { + runTest(t, mediaTypeJSON, "webtransport,!p2p-circuit", "", false, false, `{"Providers":[{"Addrs":["/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}]}`) + }) + + t.Run("JSON Response with protocol and addr filtering", func(t *testing.T) { + runTest(t, mediaTypeJSON, "quic-v1", "transport-bitswap", false, false, + `{"Providers":[{"Addrs":["/ip4/127.0.0.1/udp/4001/quic-v1","/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit","/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}]}`) + }) + + t.Run("JSON Response with protocol filtering", func(t *testing.T) { + runTest(t, mediaTypeJSON, "", "transport-ipfs-gateway-http", false, false, + `{"Providers":[{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-ipfs-gateway-http"],"Schema":"peer"}]}`) }) t.Run("Empty JSON Response", func(t *testing.T) { - runTest(t, mediaTypeJSON, true, false, `{"Providers":null}`) + runTest(t, mediaTypeJSON, "", "", true, false, `{"Providers":null}`) }) t.Run("Wildcard Accept header defaults to JSON Response", func(t *testing.T) { accept := "text/html,*/*" - runTest(t, accept, true, false, `{"Providers":null}`) + runTest(t, accept, "", "", true, false, `{"Providers":null}`) }) t.Run("Missing Accept header defaults to JSON Response", func(t *testing.T) { accept := "" - runTest(t, accept, true, false, `{"Providers":null}`) + runTest(t, accept, "", "", true, false, `{"Providers":null}`) }) t.Run("NDJSON Response", func(t *testing.T) { - runTest(t, mediaTypeNDJSON, false, true, `{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Schema":"bitswap","Protocol":"transport-bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz"}`+"\n") + runTest(t, mediaTypeNDJSON, "", "", false, true, `{"Addrs":["/ip4/127.0.0.1/tcp/4001","/ip4/127.0.0.1/udp/4001/quic-v1","/ip4/127.0.0.1/tcp/4001/ws","/ip4/102.101.1.1/tcp/4001/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit","/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit","/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-ipfs-gateway-http"],"Schema":"peer"}`+"\n") + }) + + t.Run("NDJSON Response with addr filtering", func(t *testing.T) { + runTest(t, mediaTypeNDJSON, "webtransport,!p2p-circuit,unknown", "", false, true, `{"Addrs":["/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-ipfs-gateway-http"],"Schema":"peer"}`+"\n") + }) + + t.Run("NDJSON Response with addr filtering", func(t *testing.T) { + runTest(t, mediaTypeNDJSON, "webtransport,!p2p-circuit,unknown", "", false, true, `{"Addrs":["/ip4/8.8.8.8/udp/4001/quic-v1/webtransport"],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-ipfs-gateway-http"],"Schema":"peer"}`+"\n") }) t.Run("Empty NDJSON Response", func(t *testing.T) { - runTest(t, mediaTypeNDJSON, true, true, "") + runTest(t, mediaTypeNDJSON, "", "", true, true, "") }) t.Run("404 when router returns routing.ErrNotFound", func(t *testing.T) { @@ -217,10 +261,14 @@ func TestProviders(t *testing.T) { } func TestPeers(t *testing.T) { - makeRequest := func(t *testing.T, router *mockContentRouter, contentType, arg string) *http.Response { + makeRequest := func(t *testing.T, router *mockContentRouter, contentType, arg, filterAddrs, filterProtocols string) *http.Response { server := httptest.NewServer(Handler(router)) t.Cleanup(server.Close) - req, err := http.NewRequest(http.MethodGet, "http://"+server.Listener.Addr().String()+"/routing/v1/peers/"+arg, nil) + + urlStr := fmt.Sprintf("http://%s/routing/v1/peers/%s", server.Listener.Addr().String(), arg) + urlStr = filters.AddFiltersToURL(urlStr, strings.Split(filterProtocols, ","), strings.Split(filterAddrs, ",")) + + req, err := http.NewRequest(http.MethodGet, urlStr, nil) require.NoError(t, err) if contentType != "" { req.Header.Set("Accept", contentType) @@ -234,7 +282,7 @@ func TestPeers(t *testing.T) { t.Parallel() router := &mockContentRouter{} - resp := makeRequest(t, router, mediaTypeJSON, "nonpeerid") + resp := makeRequest(t, router, mediaTypeJSON, "nonpeerid", "", "") require.Equal(t, 400, resp.StatusCode) }) @@ -247,7 +295,7 @@ func TestPeers(t *testing.T) { router := &mockContentRouter{} router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(results, nil) - resp := makeRequest(t, router, mediaTypeJSON, peer.ToCid(pid).String()) + resp := makeRequest(t, router, mediaTypeJSON, peer.ToCid(pid).String(), "", "") require.Equal(t, 404, resp.StatusCode) require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) @@ -267,7 +315,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(results, nil) // Simulate request with Accept header that includes wildcard match - resp := makeRequest(t, router, "text/html,*/*", peer.ToCid(pid).String()) + resp := makeRequest(t, router, "text/html,*/*", peer.ToCid(pid).String(), "", "") // Expect response to default to application/json require.Equal(t, 404, resp.StatusCode) @@ -285,7 +333,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(results, nil) // Simulate request without Accept header - resp := makeRequest(t, router, "", peer.ToCid(pid).String()) + resp := makeRequest(t, router, "", peer.ToCid(pid).String(), "", "") // Expect response to default to application/json require.Equal(t, 404, resp.StatusCode) @@ -301,7 +349,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(nil, routing.ErrNotFound) // Simulate request without Accept header - resp := makeRequest(t, router, "", peer.ToCid(pid).String()) + resp := makeRequest(t, router, "", peer.ToCid(pid).String(), "", "") // Expect response to default to application/json require.Equal(t, 404, resp.StatusCode) @@ -331,7 +379,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(results, nil) libp2pKeyCID := peer.ToCid(pid).String() - resp := makeRequest(t, router, mediaTypeJSON, libp2pKeyCID) + resp := makeRequest(t, router, mediaTypeJSON, libp2pKeyCID, "", "") require.Equal(t, 200, resp.StatusCode) require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) @@ -347,6 +395,110 @@ func TestPeers(t *testing.T) { require.Equal(t, expectedBody, string(body)) }) + t.Run("GET /routing/v1/peers/{cid-libp2p-key-peer-id} returns 200 with correct body and headers (JSON) with filter-addrs", func(t *testing.T) { + t.Parallel() + + addr1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + addr2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/udp/4001/quic-v1") + addr3, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/ws") + addr4, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit") + addr5, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu") + _, pid := makeEd25519PeerID(t) + _, pid2 := makeEd25519PeerID(t) + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap", "transport-foo"}, + Addrs: []types.Multiaddr{ + {Multiaddr: addr1}, + {Multiaddr: addr2}, + {Multiaddr: addr3}, + {Multiaddr: addr4}, + }, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid2, + Protocols: []string{"transport-foo"}, + Addrs: []types.Multiaddr{ + {Multiaddr: addr5}, + }, + }}, + }) + + router := &mockContentRouter{} + router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(results, nil) + + libp2pKeyCID := peer.ToCid(pid).String() + resp := makeRequest(t, router, mediaTypeJSON, libp2pKeyCID, "tcp", "") + require.Equal(t, 200, resp.StatusCode) + + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, "public, max-age=300, stale-while-revalidate=172800, stale-if-error=172800", resp.Header.Get("Cache-Control")) + + requireCloseToNow(t, resp.Header.Get("Last-Modified")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedBody := `{"Peers":[{"Addrs":["/ip4/127.0.0.1/tcp/4001","/ip4/127.0.0.1/tcp/4001/ws"],"ID":"` + pid.String() + `","Protocols":["transport-bitswap","transport-foo"],"Schema":"peer"}]}` + require.Equal(t, expectedBody, string(body)) + }) + + t.Run("GET /routing/v1/peers/{cid-libp2p-key-peer-id} returns 200 with correct body and headers (JSON) with filter-protocols", func(t *testing.T) { + t.Parallel() + + addr1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") + addr2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/udp/4001/quic-v1") + addr3, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001/ws") + addr4, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit") + addr5, _ := multiaddr.NewMultiaddr("/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu") + _, pid := makeEd25519PeerID(t) + _, pid2 := makeEd25519PeerID(t) + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap", "transport-foo"}, + Addrs: []types.Multiaddr{ + {Multiaddr: addr1}, + {Multiaddr: addr2}, + {Multiaddr: addr3}, + {Multiaddr: addr4}, + }, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid2, + Protocols: []string{"transport-foo"}, + Addrs: []types.Multiaddr{ + {Multiaddr: addr5}, + }, + }}, + }) + + router := &mockContentRouter{} + router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(results, nil) + + libp2pKeyCID := peer.ToCid(pid).String() + resp := makeRequest(t, router, mediaTypeJSON, libp2pKeyCID, "", "transport-bitswap") + require.Equal(t, 200, resp.StatusCode) + + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, "public, max-age=300, stale-while-revalidate=172800, stale-if-error=172800", resp.Header.Get("Cache-Control")) + + requireCloseToNow(t, resp.Header.Get("Last-Modified")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedBody := `{"Peers":[{"Addrs":["/ip4/127.0.0.1/tcp/4001","/ip4/127.0.0.1/udp/4001/quic-v1","/ip4/127.0.0.1/tcp/4001/ws","/ip4/102.101.1.1/udp/4001/quic-v1/webtransport/p2p/12D3KooWEjsGPUQJ4Ej3d1Jcg4VckWhFbhc6mkGunMm1faeSzZMu/p2p-circuit"],"ID":"` + pid.String() + `","Protocols":["transport-bitswap","transport-foo"],"Schema":"peer"}]}` + require.Equal(t, expectedBody, string(body)) + }) + t.Run("GET /routing/v1/peers/{cid-libp2p-key-peer-id} returns 404 with correct body and headers (No Results, NDJSON)", func(t *testing.T) { t.Parallel() @@ -356,7 +508,7 @@ func TestPeers(t *testing.T) { router := &mockContentRouter{} router.On("FindPeers", mock.Anything, pid, DefaultStreamingRecordsLimit).Return(results, nil) - resp := makeRequest(t, router, mediaTypeNDJSON, peer.ToCid(pid).String()) + resp := makeRequest(t, router, mediaTypeNDJSON, peer.ToCid(pid).String(), "", "") require.Equal(t, 404, resp.StatusCode) require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) @@ -389,7 +541,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, DefaultStreamingRecordsLimit).Return(results, nil) libp2pKeyCID := peer.ToCid(pid).String() - resp := makeRequest(t, router, mediaTypeNDJSON, libp2pKeyCID) + resp := makeRequest(t, router, mediaTypeNDJSON, libp2pKeyCID, "", "") require.Equal(t, 200, resp.StatusCode) require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) @@ -451,7 +603,7 @@ func TestPeers(t *testing.T) { router := &mockContentRouter{} router.On("FindPeers", mock.Anything, pid, DefaultStreamingRecordsLimit).Return(iter.FromSlice(results), nil) - resp := makeRequest(t, router, mediaTypeNDJSON, peerIDStr) + resp := makeRequest(t, router, mediaTypeNDJSON, peerIDStr, "", "") require.Equal(t, 200, resp.StatusCode) require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) @@ -471,7 +623,7 @@ func TestPeers(t *testing.T) { router := &mockContentRouter{} router.On("FindPeers", mock.Anything, pid, DefaultRecordsLimit).Return(iter.FromSlice(results), nil) - resp := makeRequest(t, router, mediaTypeJSON, peerIDStr) + resp := makeRequest(t, router, mediaTypeJSON, peerIDStr, "", "") require.Equal(t, 200, resp.StatusCode) require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) diff --git a/routing/http/types/iter/filter.go b/routing/http/types/iter/filter.go new file mode 100644 index 000000000..628997811 --- /dev/null +++ b/routing/http/types/iter/filter.go @@ -0,0 +1,43 @@ +package iter + +// Filter returns an iterator that filters out values that don't satisfy the predicate f. +func Filter[T any](iter Iter[T], f func(t T) bool) *FilterIter[T] { + return &FilterIter[T]{iter: iter, f: f} +} + +type FilterIter[T any] struct { + iter Iter[T] + f func(T) bool + + done bool + val T +} + +func (f *FilterIter[T]) Next() bool { + if f.done { + return false + } + + ok := f.iter.Next() + f.done = !ok + + if f.done { + return false + } + + f.val = f.iter.Val() + + if f.f(f.val) { + return true + } + + return f.Next() +} + +func (f *FilterIter[T]) Val() T { + return f.val +} + +func (f *FilterIter[T]) Close() error { + return f.iter.Close() +} diff --git a/routing/http/types/iter/filter_test.go b/routing/http/types/iter/filter_test.go new file mode 100644 index 000000000..6d170285e --- /dev/null +++ b/routing/http/types/iter/filter_test.go @@ -0,0 +1,41 @@ +package iter + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFilter(t *testing.T) { + for _, c := range []struct { + input Iter[int] + f func(int) bool + expResults []int + }{ + { + input: FromSlice([]int{1, 2, 3, 4}), + f: func(i int) bool { return i%2 == 0 }, + expResults: []int{2, 4}, + }, + { + input: FromSlice([]int{}), + f: func(i int) bool { return i%2 == 0 }, + expResults: nil, + }, + { + input: FromSlice([]int{1, 3, 5, 100}), + f: func(i int) bool { return i > 2 }, + expResults: []int{3, 5, 100}, + }, + } { + t.Run(fmt.Sprintf("%v", c.input), func(t *testing.T) { + iter := Filter(c.input, c.f) + var res []int + for iter.Next() { + res = append(res, iter.Val()) + } + assert.Equal(t, c.expResults, res) + }) + } +} diff --git a/routing/http/types/record_peer.go b/routing/http/types/record_peer.go index 76bd810e0..cb4a04fca 100644 --- a/routing/http/types/record_peer.go +++ b/routing/http/types/record_peer.go @@ -79,3 +79,13 @@ func (pr PeerRecord) MarshalJSON() ([]byte, error) { return drjson.MarshalJSONBytes(m) } + +func FromBitswapRecord(br *BitswapRecord) *PeerRecord { + return &PeerRecord{ + Schema: SchemaPeer, + ID: br.ID, + Addrs: br.Addrs, + Protocols: []string{br.Protocol}, + Extra: map[string]json.RawMessage{}, + } +} diff --git a/version.json b/version.json index 93d6ca712..ee0e5814d 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.23.0" + "version": "v0.24.0" }