diff --git a/package.json b/package.json index 8b0ba0a9..3dc6942b 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ }, "packageManager": "pnpm@8.15.0+sha256.fd1eab68a6d403f35cf3259c53780d70b0f14bd74e39da2f917d201f554d8665", "devDependencies": { - "@types/node": "^20.11.8", + "@types/node": "^20.11.9", "@types/wtfnode": "^0.7.3", "@typescript/lib-dom": "npm:@types/web@0.0.135", "@vitest/coverage-v8": "^1.2.2", diff --git a/packages/l3face/README.md b/packages/l3face/README.md index f376bfb7..2591b0bc 100644 --- a/packages/l3face/README.md +++ b/packages/l3face/README.md @@ -3,4 +3,14 @@ This package is part of [NDNts](https://yoursunny.com/p/NDNts/), Named Data Networking libraries for the modern web. This package implements network layer face and transport base types. +Notable content includes: + +* **Transport** type: transport base type, +* **StreamTransport** type: Node.js stream-based transport implementation. +* `rxFromStream` function: extract TLVs from continuous byte stream. +* `rxFromPacketIterable`: decode TLVs from datagrams. +* **L3Face** type: TLV-oriented network layer face, for use with logical Forwarder of `@ndn/fw` package. +* `L3Face.makeCreateFace` function: higher-order function that generates `*Transport.createFace` functions. +* **Bridge** type: pass packets between two logical forwarders, primarily for unit testing. + See `@ndn/node-transport` package for more explanation and examples. diff --git a/packages/l3face/package.json b/packages/l3face/package.json index 68d7f348..d36efbe9 100644 --- a/packages/l3face/package.json +++ b/packages/l3face/package.json @@ -39,8 +39,5 @@ "tslib": "^2.6.2", "type-fest": "^4.10.1", "typescript-event-target": "^1.1.0" - }, - "devDependencies": { - "p-defer": "^4.0.0" } } \ No newline at end of file diff --git a/packages/l3face/src/bridge.ts b/packages/l3face/src/bridge.ts new file mode 100644 index 00000000..7790ce64 --- /dev/null +++ b/packages/l3face/src/bridge.ts @@ -0,0 +1,213 @@ +import { Forwarder, type FwFace } from "@ndn/fw"; +import type { NameLike } from "@ndn/packet"; +import { Decoder } from "@ndn/tlv"; +import { assert, Closers, delay, randomJitter } from "@ndn/util"; +import { pushable } from "it-pushable"; +import { filter, map, pipeline, transform } from "streaming-iterables"; + +import { L3Face } from "./l3face"; +import { Transport } from "./transport"; + +class BridgeTransport extends Transport { + public override readonly rx: Transport.Rx; + public bridgePeer?: BridgeTransport; + private readonly bridgeRx = pushable({ objectMode: true }); + + constructor(bridgeName: string, relay: Bridge.RelayFunc, private readonly closePromise: Promise) { + super({ describe: `BRIDGE(${bridgeName})` }); + this.rx = map((wire) => new Decoder(wire).read(), relay(this.bridgeRx)); + } + + public override readonly tx = async (iterable: AsyncIterable) => { + assert(this.bridgePeer, "bridgePeer must be set"); + const iterator = iterable[Symbol.asyncIterator](); + while (true) { + const result = await Promise.race([ + iterator.next(), + this.closePromise, + ]); + if (!result || result.done) { // normal close + return; + } + const copy = result.value.slice(); + this.bridgePeer.bridgeRx.push(copy); + } + }; +} + +/** + * A bridge passes packets between two logical forwarders. + * Disposing the bridge severs the link. + */ +export interface Bridge extends Disposable { + readonly fwA: Forwarder; + readonly fwB: Forwarder; + /** Face on fwA linking to fwB. */ + readonly faceA: FwFace; + /** Face on fwB linking to fwA. */ + readonly faceB: FwFace; + + /** Change fw* and face* property names. */ + rename(A: A, B: B): Bridge.Renamed; +} + +function makeRelayFunc(relay: Bridge.Relay): Bridge.RelayFunc { + if (typeof relay === "function") { + return relay; + } + const { + loss = 0, + delay: delayMs = 1, + jitter = 0, + } = relay; + const delayJitter = randomJitter(jitter, delayMs); + return (it) => pipeline( + () => it, + filter(() => loss === 0 || Math.random() >= loss), + transform(64, async (pkt) => { + await delay(delayJitter()); + return pkt; + }), + ); +} + +function rename(this: Bridge, A: A, B: B): Bridge.Renamed { + const map = { + [`fw${A}`]: "fwA", + [`fw${B}`]: "fwB", + [`face${A}`]: "faceA", + [`face${B}`]: "faceB", + }; + return new Proxy(this as unknown as Bridge.Renamed, { + get(target, prop, receiver) { + if (typeof prop === "string" && map[prop]) { + prop = map[prop] as any; + } + return Reflect.get(target, prop, receiver); + }, + }); +} + +export namespace Bridge { + /** + * Function to relay packets between two logical forwarders. + * @param it iterable of packet buffers received from peer side. + * @returns iterable of packet buffers injected into our side. + */ + export type RelayFunc = (it: AsyncIterable) => AsyncIterable; + + /** Options to relay packets with loss, delay, and jitter. */ + export interface RelayOptions { + /** + * Packet loss rate between 0.0 (no loss) and 1.0 (100% loss). + * @default 0 + */ + loss?: number; + + /** + * Median delay in milliseconds. + * @default 1 + */ + delay?: number; + + /** + * Jitter around median delay, see @ndn/util randomJitter function. + * @default 0 + */ + jitter?: number; + } + + export type Relay = RelayFunc | RelayOptions; + + export interface CreateOptions { + /** Description for debugging purpose. */ + bridgeName?: string; + + /** + * Forwarder A. + * Default is a new Forwarder that can be retrieved with bridge.fwA . + * Disposing the bridge closes auto-created Forwarder but not passed-in Forwarder. + */ + fwA?: Forwarder; + + /** + * Forwarder B. + * Default is a new Forwarder that can be retrieved with bridge.fwB . + * Disposing the bridge closes auto-created Forwarder but not passed-in Forwarder. + */ + fwB?: Forwarder; + + /** Options for creating Forwarder instances. */ + fwOpts?: Forwarder.Options; + + /** + * Relay options for packets from forwarder A to forwarder B. + * Default is 0% loss and 1ms delay. + */ + relayAB?: Relay; + /** + * Relay options for packets from forwarder B to forwarder A. + * Default is 0% loss and 1ms delay. + */ + relayBA?: Relay; + + /** + * Routes from forwarder A to forwarder B. + * Default is ["/"]. + */ + routesAB?: readonly NameLike[]; + /** + * Routes from forwarder B to forwarder A. + * Default is ["/"]. + */ + routesBA?: readonly NameLike[]; + } + + /** Create a bridge that passes packets between two logical forwarders. */ + export function create({ + bridgeName = "bridge", + fwA, + fwB, + fwOpts, + relayAB = (x) => x, + relayBA = (x) => x, + routesAB, + routesBA, + }: CreateOptions = {}): Bridge { + const closers = new Closers(); + const closing = closers.wait(); + if (!fwA) { + closers.push((fwA = Forwarder.create(fwOpts))); + } + if (!fwB) { + closers.push((fwB = Forwarder.create(fwOpts))); + } + + const tA = new BridgeTransport(bridgeName, makeRelayFunc(relayBA), closing); + const tB = new BridgeTransport(bridgeName, makeRelayFunc(relayAB), closing); + tA.bridgePeer = tB; + tB.bridgePeer = tA; + + const faceA = fwA.addFace(new L3Face(tA, { advertiseFrom: false })); + L3Face.processAddRoutes(faceA, routesAB); + const faceB = fwB.addFace(new L3Face(tB, { advertiseFrom: false })); + L3Face.processAddRoutes(faceB, routesBA); + + return { + fwA, + fwB, + faceA, + faceB, + rename, + [Symbol.dispose]() { + closers.close(); + }, + }; + } + + export type Renamed = Disposable & { + [k in `fw${A | B}`]: Forwarder; + } & { + [k in `face${A | B}`]: FwFace; + }; +} diff --git a/packages/l3face/src/mod.ts b/packages/l3face/src/mod.ts index d3705590..fc2934bb 100644 --- a/packages/l3face/src/mod.ts +++ b/packages/l3face/src/mod.ts @@ -1,3 +1,4 @@ +export * from "./bridge"; export * from "./l3face"; export * from "./rxtx-iterable"; export * from "./rxtx-stream"; diff --git a/packages/l3face/src/rxtx-stream.ts b/packages/l3face/src/rxtx-stream.ts index b4bb5293..808293e5 100644 --- a/packages/l3face/src/rxtx-stream.ts +++ b/packages/l3face/src/rxtx-stream.ts @@ -1,7 +1,7 @@ import type { Socket } from "node:net"; import { Decoder } from "@ndn/tlv"; -import { safeIter } from "@ndn/util"; +import { concatBuffers, safeIter } from "@ndn/util"; import { pEvent } from "p-event"; import { writeToStream } from "streaming-iterables"; @@ -13,10 +13,10 @@ import type { Transport } from "./transport"; * @returns AsyncIterable of TLVs. */ export async function* rxFromStream(conn: NodeJS.ReadableStream): Transport.Rx { - let leftover = Buffer.alloc(0); + let leftover = new Uint8Array(); for await (const chunk of safeIter(conn as AsyncIterable)) { if (leftover.length > 0) { - leftover = Buffer.concat([leftover, chunk], leftover.length + chunk.length); + leftover = concatBuffers([leftover, chunk], leftover.length + chunk.length); } else { leftover = chunk; } diff --git a/packages/l3face/test-fixture/bridge.ts b/packages/l3face/test-fixture/bridge.ts deleted file mode 100644 index d840f91a..00000000 --- a/packages/l3face/test-fixture/bridge.ts +++ /dev/null @@ -1,130 +0,0 @@ -import type { Forwarder, FwFace } from "@ndn/fw"; -import type { NameLike } from "@ndn/packet"; -import { Decoder } from "@ndn/tlv"; -import { delay, randomJitter } from "@ndn/util"; -import { pushable } from "it-pushable"; -import pDefer from "p-defer"; -import { filter, map, pipeline, transform } from "streaming-iterables"; - -import { L3Face, Transport } from ".."; - -class BridgeTransport extends Transport { - public override readonly rx: Transport.Rx; - public bridgePeer?: BridgeTransport; - private readonly bridgeRx = pushable({ objectMode: true }); - - constructor(bridgeName: string, relay: Bridge.RelayFunc, private readonly closePromise: Promise) { - super({ describe: `BRIDGE(${bridgeName})` }); - this.rx = map((wire) => new Decoder(wire).read(), relay(this.bridgeRx)); - } - - public override readonly tx = async (iterable: AsyncIterable) => { - const iterator = iterable[Symbol.asyncIterator](); - while (true) { - const result = await Promise.race([ - iterator.next(), - this.closePromise, - ]); - if (!result || result.done) { // normal close - return; - } - const copy = new Uint8Array(result.value); - this.bridgePeer?.bridgeRx.push(copy); - } - }; -} - -/** A bridge that links two forwarders. */ -export interface Bridge extends Disposable { - faceA: FwFace; - faceB: FwFace; -} - -function makeRelayFunc(relay: Bridge.Relay): Bridge.RelayFunc { - if (typeof relay === "function") { - return relay; - } - const { - loss = 0, - delay: delayMs = 1, - jitter = 0, - } = relay; - const delayJitter = randomJitter(jitter, delayMs); - return (it) => pipeline( - () => it, - filter(() => loss === 0 || Math.random() >= loss), - transform(64, async (pkt) => { - await delay(delayJitter()); - return pkt; - }), - ); -} - -export namespace Bridge { - export type RelayFunc = (it: AsyncIterable) => AsyncIterable; - - export interface RelayOptions { - /** - * Packet loss rate between 0.0 (no loss) and 1.0 (100% loss). - * @default 0 - */ - loss?: number; - - /** - * Median delay in milliseconds. - * @default 1 - */ - delay?: number; - - /** - * Jitter around median delay, see @ndn/util randomJitter function. - * @default 0 - */ - jitter?: number; - } - - export type Relay = RelayFunc | RelayOptions; - - export interface CreateOptions { - bridgeName?: string; - fwA: Forwarder; - fwB: Forwarder; - relayAB?: Relay; - relayBA?: Relay; - routesAB?: readonly NameLike[]; - routesBA?: readonly NameLike[]; - } - - /** - * Create a bridge that links two forwarders. - * The relay functions can inject loss, delay, and jitter to the simulated link. - */ - export function create({ - bridgeName = "bridge", - fwA, - fwB, - relayAB = (x) => x, - relayBA = (x) => x, - routesAB, - routesBA, - }: CreateOptions): Bridge { - const close = pDefer(); - const tA = new BridgeTransport(bridgeName, makeRelayFunc(relayBA), close.promise); - const tB = new BridgeTransport(bridgeName, makeRelayFunc(relayAB), close.promise); - tA.bridgePeer = tB; - tB.bridgePeer = tA; - const faceA = fwA.addFace(new L3Face(tA, { advertiseFrom: false })); - L3Face.processAddRoutes(faceA, routesAB); - const faceB = fwB.addFace(new L3Face(tB, { advertiseFrom: false })); - L3Face.processAddRoutes(faceB, routesBA); - return { - faceA, - faceB, - [Symbol.dispose]() { - faceA.close(); - faceB.close(); - close.resolve(false); - }, - }; - } -} diff --git a/packages/nfdmgmt/tests/prefix-reg.t.ts b/packages/nfdmgmt/tests/prefix-reg.t.ts index c96ffad5..67a7cd16 100644 --- a/packages/nfdmgmt/tests/prefix-reg.t.ts +++ b/packages/nfdmgmt/tests/prefix-reg.t.ts @@ -4,7 +4,7 @@ import { Endpoint } from "@ndn/endpoint"; import { Forwarder, type FwFace, FwPacket } from "@ndn/fw"; import { NoopFace } from "@ndn/fw/test-fixture/noop-face"; import { Certificate, generateSigningKey, KeyChain, ValidityPeriod } from "@ndn/keychain"; -import { Bridge } from "@ndn/l3face/test-fixture/bridge"; +import { Bridge } from "@ndn/l3face"; import { Component, Data, Interest, type Name, ParamsDigest } from "@ndn/packet"; import { Decoder, Encoder, NNI } from "@ndn/tlv"; import { Closers, delay } from "@ndn/util"; @@ -136,10 +136,10 @@ test("preloadCert", async () => { }); await userKeyChain.insertCert(userCert); - const nfdFw = Forwarder.create(); - const nfdEp = new Endpoint({ fw: nfdFw }); + using bridge = Bridge.create().rename("NFD", "User"); + const nfdEp = new Endpoint({ fw: bridge.fwNFD }); const interP = new Endpoint({ - fw: nfdFw, + fw: bridge.fwNFD, announcement: false, }).produce(interPub.name, async () => interCert.data); let nCommands = 0; @@ -152,19 +152,16 @@ test("preloadCert", async () => { [0x66, NNI(200)], [0x67]])); }); + closers.push(nfdP, interP); - const userFw = Forwarder.create(); - using bridge = Bridge.create({ fwA: nfdFw, fwB: userFw }); - closers.push(nfdFw, nfdP, interP, userFw); - - enableNfdPrefixReg(bridge.faceB, { + enableNfdPrefixReg(bridge.faceUser, { signer: userPvt.withKeyLocator(userCert.name), preloadCertName: userCert.name, preloadFromKeyChain: userKeyChain, preloadInterestLifetime: 100, }); - const userEp = new Endpoint({ fw: userFw }); + const userEp = new Endpoint({ fw: bridge.fwUser }); const userPA = userEp.produce("/A", async () => undefined); const userPB = userEp.produce("/B", async () => undefined); closers.push(userPA, userPB); diff --git a/packages/repo-external/src/prps/publisher.ts b/packages/repo-external/src/prps/publisher.ts index cabfe425..4c693528 100644 --- a/packages/repo-external/src/prps/publisher.ts +++ b/packages/repo-external/src/prps/publisher.ts @@ -151,7 +151,7 @@ export namespace PrpsPublisher { * A callback function to generate publication packet. * @param name expected Data name. * @param topic topic name. - * @return either a Data that is already signed, or an Encodable object to use as publication body. + * @returns either a Data that is already signed, or an Encodable object to use as publication body. */ export type PublicationCallback = (name: Name, topic: Name) => Promise; } diff --git a/packages/segmented-object/tests/serve-fetch.t.ts b/packages/segmented-object/tests/serve-fetch.t.ts index 68f6326c..7cc4ecce 100644 --- a/packages/segmented-object/tests/serve-fetch.t.ts +++ b/packages/segmented-object/tests/serve-fetch.t.ts @@ -5,7 +5,7 @@ import { Blob } from "node:buffer"; import { Endpoint, type ProducerHandler } from "@ndn/endpoint"; import { Forwarder } from "@ndn/fw"; -import { Bridge } from "@ndn/l3face/test-fixture/bridge"; +import { Bridge } from "@ndn/l3face"; import { Segment2, Segment3 } from "@ndn/naming-convention2"; import { Data, FwHint, Name, type Verifier } from "@ndn/packet"; import { Closers, delay } from "@ndn/util"; @@ -17,9 +17,10 @@ import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from "vi import { BlobChunkSource, BufferChunkSource, fetch, FileChunkSource, IterableChunkSource, makeChunkSource, serve, StreamChunkSource } from ".."; import { makeObjectBody } from "../test-fixture/object-body"; +const fwOpts: Forwarder.Options = { dataNoTokenMatch: false }; const closers = new Closers(); const objectBody = makeObjectBody(); -beforeEach(() => Forwarder.replaceDefault(Forwarder.create({ dataNoTokenMatch: false }))); +beforeEach(() => Forwarder.replaceDefault(Forwarder.create(fwOpts))); afterEach(() => { closers.close(); Forwarder.deleteDefault(); @@ -135,20 +136,18 @@ test.each([ (fw, fwHint) => ({ fw, modifyInterest: { fwHint } }), (fw, fwHint) => ({ endpoint: new Endpoint({ fw, modifyInterest: { fwHint } }) }), ] satisfies Array<(fw: Forwarder, fwHint: FwHint) => fetch.Options>)("modifyInterest %#", async (makeOpts) => { - const fwA = Forwarder.create({ dataNoTokenMatch: false }); - fwA.nodeNames.push(new Name("/S")); - const server = serve("/R", new BufferChunkSource(objectBody), { endpoint: new Endpoint({ fw: fwA }) }); - closers.push(server); - - const fwB = Forwarder.create({ dataNoTokenMatch: false }); using bridge = Bridge.create({ - fwA, - fwB, + fwOpts, routesAB: [], routesBA: ["/S"], + }).rename("S", "F"); + bridge.fwS.nodeNames.push(new Name("/S")); + const server = serve("/R", new BufferChunkSource(objectBody), { + endpoint: new Endpoint({ fw: bridge.fwS }), }); + closers.push(server); - await expect(fetch("/R", makeOpts(fwB, new FwHint("/S")))).resolves.toEqualUint8Array(objectBody); + await expect(fetch("/R", makeOpts(bridge.fwF, new FwHint("/S")))).resolves.toEqualUint8Array(objectBody); }); describe("empty object", () => { @@ -217,10 +216,6 @@ test("abort", async () => { }); test("congestion avoidance", async () => { - const fw = Forwarder.create(); - const server = serve("/R", new BufferChunkSource(objectBody), { endpoint: new Endpoint({ fw }) }); - closers.push(server); - const relay: Bridge.RelayOptions = { loss: 0.02, delay: 50, @@ -228,10 +223,15 @@ test("congestion avoidance", async () => { }; using bridge = Bridge.create({ fwA: Forwarder.getDefault(), - fwB: fw, + fwOpts, relayAB: relay, relayBA: relay, + }).rename("F", "S"); + + const server = serve("/R", new BufferChunkSource(objectBody), { + endpoint: new Endpoint({ fw: bridge.fwS }), }); + closers.push(server); const fetched = fetch("/R"); await expect(fetched).resolves.toEqualUint8Array(objectBody); diff --git a/packages/sync/src/psync/param-compat.ts b/packages/sync/src/psync/param-compat.ts index 92ad3059..0882a3e3 100644 --- a/packages/sync/src/psync/param-compat.ts +++ b/packages/sync/src/psync/param-compat.ts @@ -58,7 +58,7 @@ function splitPrefixSeqNum(value: Uint8Array) { } const noCompression: PSyncCodec.Compression = { - compress: (input) => new Uint8Array(input), // make a copy + compress: (input) => input.slice(), // make a copy decompress: (input) => input, }; diff --git a/packages/sync/tests/psync-full.t.ts b/packages/sync/tests/psync-full.t.ts index f32fc701..e5281427 100644 --- a/packages/sync/tests/psync-full.t.ts +++ b/packages/sync/tests/psync-full.t.ts @@ -2,7 +2,7 @@ import "@ndn/packet/test-fixture/expect"; import { Endpoint } from "@ndn/endpoint"; import { Forwarder } from "@ndn/fw"; -import { Bridge } from "@ndn/l3face/test-fixture/bridge"; +import { Bridge } from "@ndn/l3face"; import { Name, type NameLike } from "@ndn/packet"; import { assert, Closers, delay, toHex } from "@ndn/util"; import DefaultMap from "mnemonist/default-map.js"; @@ -71,17 +71,15 @@ class Fixture { this.syncs.push(new PSyncFull({ ...opts })); for (let i = 1; i < n; ++i) { - const fw = Forwarder.create(); const bridge = Bridge.create({ fwA: Forwarder.getDefault(), - fwB: fw, relayAB: { loss, delay: 3, jitter: 0.6 }, relayBA: { delay: 3, jitter: 0.6 }, }); this.syncs.push(new PSyncFull({ ...opts, - endpoint: new Endpoint({ fw }), + endpoint: new Endpoint({ fw: bridge.fwB }), })); closers.push(bridge); } diff --git a/packages/sync/tests/svs.t.ts b/packages/sync/tests/svs.t.ts index 91cbdf4a..3b065406 100644 --- a/packages/sync/tests/svs.t.ts +++ b/packages/sync/tests/svs.t.ts @@ -1,6 +1,6 @@ import { Endpoint } from "@ndn/endpoint"; import { Forwarder } from "@ndn/fw"; -import { Bridge } from "@ndn/l3face/test-fixture/bridge"; +import { Bridge } from "@ndn/l3face"; import { Name } from "@ndn/packet"; import { Closers, delay } from "@ndn/util"; import DefaultMap from "mnemonist/default-map.js"; @@ -58,14 +58,11 @@ afterEach(closers.close); // specification section 5.2 example test("example", async () => { const debugHandler = new DebugHandler(); - const fwAB = Forwarder.create(); - const fwC = Forwarder.create(); let lossToC = false; using bridge = Bridge.create({ - fwA: fwAB, - fwB: fwC, relayAB: (it) => filter(() => !lossToC, it), - }); + }).rename("AB", "C"); + const { fwAB, fwC } = bridge; const opts: SvSync.Options = { ...baseOpts, diff --git a/packages/sync/tests/syncps.t.ts b/packages/sync/tests/syncps.t.ts index 38ffa15f..6d71af8b 100644 --- a/packages/sync/tests/syncps.t.ts +++ b/packages/sync/tests/syncps.t.ts @@ -2,7 +2,7 @@ import "@ndn/packet/test-fixture/expect"; import { Endpoint } from "@ndn/endpoint"; import { Forwarder } from "@ndn/fw"; -import { Bridge } from "@ndn/l3face/test-fixture/bridge"; +import { Bridge } from "@ndn/l3face"; import { Timestamp } from "@ndn/naming-convention2"; import { Data, Name, type NameLike } from "@ndn/packet"; import { assert, Closers, delay } from "@ndn/util"; @@ -57,17 +57,15 @@ class Fixture { this.syncs.push(new SyncpsPubsub({ ...opts })); for (let i = 1; i < n; ++i) { - const fw = Forwarder.create(); const bridge = Bridge.create({ fwA: Forwarder.getDefault(), - fwB: fw, relayAB: { loss, delay: 3, jitter: 0.6 }, relayBA: { delay: 3, jitter: 0.6 }, }); this.syncs.push(new SyncpsPubsub({ ...opts, - endpoint: new Endpoint({ fw }), + endpoint: new Endpoint({ fw: bridge.fwB }), })); closers.push(bridge); }