Skip to content

Commit

Permalink
segmented-object: Fetcher inherits options from Endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jan 28, 2024
1 parent ec10486 commit f55b905
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 44 deletions.
13 changes: 7 additions & 6 deletions packages/packet/src/interest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ interface CtorTag {
[ctorAssign](f: Fields): void;
}

const modifyFields = [
"canBePrefix", "mustBeFresh", "fwHint", "lifetime", "hopLimit",
] as const satisfies ReadonlyArray<keyof PublicFields>;

export namespace Interest {
/** Generate a random nonce. */
export function generateNonce(): number {
Expand Down Expand Up @@ -294,22 +298,19 @@ export namespace Interest {
export type ModifyFunc = (interest: Interest) => void;

/** Common fields to assign onto an existing Interest. */
export type ModifyFields = Partial<Pick<Fields, "canBePrefix" | "mustBeFresh" | "fwHint" | "lifetime" | "hopLimit">>;
export type ModifyFields = Partial<Pick<PublicFields, typeof modifyFields[number]>>;

/** A structure to modify an existing Interest. */
export type Modify = ModifyFunc | ModifyFields;

/** Turn ModifyFields to ModifyFunc; return ModifyFunc as-is. */
export function makeModifyFunc(input?: Modify): ModifyFunc {
if (!input) {
return () => undefined;
}
export function makeModifyFunc(input: Modify = () => undefined): ModifyFunc {
if (typeof input === "function") {
return input;
}

const patch: Schema<ModifyFields, unknown> = {};
for (const key of ["canBePrefix", "mustBeFresh", "fwHint", "lifetime", "hopLimit"] as const) {
for (const key of modifyFields) {
if (input[key] !== undefined) {
patch[key] = input[key];
}
Expand Down
85 changes: 56 additions & 29 deletions packages/segmented-object/src/fetch/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,36 @@ export class Fetcher extends TypedEventTarget<EventMap> {
private count_ = 0;
private readonly logic: FetchLogic;
private readonly face: FwFace;
private readonly modifyInterest: Interest.ModifyFunc;

constructor(private readonly name: Name, private readonly opts: Fetcher.Options) {
private readonly segmentNumConvention!: SegmentConvention;
private readonly modifyInterest!: Interest.ModifyFunc;
private readonly signal?: AbortSignal;
private readonly lifetimeAfterRto!: number;
private readonly acceptContentType!: readonly number[];
private readonly verifier?: Verifier;

constructor(private readonly name: Name, opts: Fetcher.Options) {
super();

const endpoint = opts.endpoint;
const {
fw = endpoint?.fw ?? Forwarder.getDefault(),
describe = `fetch(${name})`,
segmentNumConvention = defaultSegmentConvention,
modifyInterest = endpoint?.opts.modifyInterest,
signal = endpoint?.opts.signal,
lifetimeAfterRto = 1000,
acceptContentType = [0],
verifier = endpoint?.opts.verifier,
} = opts;
Object.assign(this, {
segmentNumConvention,
modifyInterest: Interest.makeModifyFunc(modifyInterest),
signal,
lifetimeAfterRto,
acceptContentType,
verifier,
} satisfies Fetcher.Options);

this.logic = new FetchLogic(opts);
this.logic.addEventListener("end", () => {
this.dispatchTypedEvent("end", new Event("end"));
Expand All @@ -36,20 +62,16 @@ export class Fetcher extends TypedEventTarget<EventMap> {
this.fail(new Error(`cannot retrieve segment ${segNum}`));
});

this.face = (opts.endpoint?.fw ?? Forwarder.getDefault()).addFace({
this.face = fw.addFace({
rx: this.tx(),
tx: this.rx,
}, {
describe: opts.describe ?? `fetch(${name})`,
});

this.modifyInterest = Interest.makeModifyFunc(opts.modifyInterest);
}, { describe });

opts.signal?.addEventListener("abort", this.handleAbort);
this.signal?.addEventListener("abort", this.handleAbort);
}

public close() {
this.opts.signal?.removeEventListener("abort", this.handleAbort);
public close(): void {
this.signal?.removeEventListener("abort", this.handleAbort);
this.logic.close();
this.face.close();
}
Expand All @@ -63,14 +85,10 @@ export class Fetcher extends TypedEventTarget<EventMap> {
}

private tx(): AsyncIterable<FwPacket> {
const {
segmentNumConvention = defaultSegmentConvention,
lifetimeAfterRto = 1000,
} = this.opts;
return this.logic.outgoing(
({ segNum, rto }) => {
const interest = new Interest(this.name.append(segmentNumConvention, segNum),
Interest.Lifetime(rto + lifetimeAfterRto));
const interest = new Interest(this.name.append(this.segmentNumConvention, segNum),
Interest.Lifetime(rto + this.lifetimeAfterRto));
this.modifyInterest(interest);
return FwPacket.create(interest, segNum);
},
Expand All @@ -79,11 +97,8 @@ export class Fetcher extends TypedEventTarget<EventMap> {
}

private readonly rx = async (iterable: AsyncIterable<FwPacket>) => {
const {
acceptContentType = [0],
} = this.opts;
for await (const { l3, token, congestionMark = 0 } of iterable) {
if (l3 instanceof Data && typeof token === "number" && acceptContentType.includes(l3.contentType)) {
if (l3 instanceof Data && typeof token === "number" && this.acceptContentType.includes(l3.contentType)) {
void this.handleData(l3, token, congestionMark);
}
}
Expand All @@ -92,7 +107,7 @@ export class Fetcher extends TypedEventTarget<EventMap> {
private async handleData(data: Data, segNum: number, congestionMark: number) {
const now = this.logic.now();
try {
await this.opts.verifier?.verify(data);
await this.verifier?.verify(data);
} catch (err: unknown) {
this.fail(new Error(`cannot verify segment ${segNum}: ${err}`));
return;
Expand All @@ -101,11 +116,8 @@ export class Fetcher extends TypedEventTarget<EventMap> {
this.logic.satisfy(segNum, now, congestionMark !== 0);
if (data.isFinalBlock) {
this.logic.setFinalSegNum(segNum);
} else {
let segmentConvention: SegmentConvention;
if (data.finalBlockId?.is((segmentConvention = this.opts.segmentNumConvention ?? defaultSegmentConvention))) {
this.logic.setFinalSegNum(data.finalBlockId.as(segmentConvention), true);
}
} else if (data.finalBlockId?.is(this.segmentNumConvention)) {
this.logic.setFinalSegNum(data.finalBlockId.as(this.segmentNumConvention), true);
}
++this.count_;
this.dispatchTypedEvent("segment", new Fetcher.SegmentDataEvent("segment", segNum, data));
Expand All @@ -125,9 +137,24 @@ export class Fetcher extends TypedEventTarget<EventMap> {

export namespace Fetcher {
export interface Options extends FetchLogic.Options {
/** Use the specified endpoint instead of the default. */
/**
* Inherit fetcher options from Endpoint consumer options.
*
* These options are inherited if the corresponding fetcher option is unset:
* @li fw
* @li modifyInterest
* @li signal
* @li verifier
*
* Other options cannot be inherited, notably:
* @li describe
* @li retx
*/
endpoint?: Endpoint;

/** Use the specified logical forwarder instead of the default. */
fw?: Forwarder;

/** FwFace description. */
describe?: string;

Expand Down
38 changes: 29 additions & 9 deletions packages/segmented-object/tests/serve-fetch.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Endpoint, type ProducerHandler } from "@ndn/endpoint";
import { Forwarder } from "@ndn/fw";
import { Bridge } from "@ndn/l3face/test-fixture/bridge";
import { Segment2, Segment3 } from "@ndn/naming-convention2";
import { Data, Name, type Verifier } from "@ndn/packet";
import { Data, FwHint, Name, type Verifier } from "@ndn/packet";
import { Closers, delay } from "@ndn/util";
import { deleteTmpFiles, writeTmpFile } from "@ndn/util/test-fixture/tmpfile";
import { BufferReadableMock, BufferWritableMock } from "stream-mock";
Expand Down Expand Up @@ -131,6 +131,27 @@ test("ranged", async () => {
]);
});

test.each([
(fw, fwHint) => ({ fw, modifyInterest: { fwHint } }),
(fw, fwHint) => ({ endpoint: new Endpoint({ fw, modifyInterest: { fwHint } }) }),
] satisfies Array<(fw: Forwarder, fwHint: FwHint) => fetch.Options>)("modifyInterest %#", async (makeOpts) => {
const fwA = Forwarder.create({ dataNoTokenMatch: false });
fwA.nodeNames.push(new Name("/S"));
const server = serve("/R", new BufferChunkSource(objectBody), { endpoint: new Endpoint({ fw: fwA }) });
closers.push(server);

const fwB = Forwarder.create({ dataNoTokenMatch: false });
const bridge = Bridge.create({
fwA,
fwB,
routesAB: [],
routesBA: ["/S"],
});
closers.push(bridge);

await expect(fetch("/R", makeOpts(fwB, new FwHint("/S")))).resolves.toEqualUint8Array(objectBody);
});

describe("empty object", () => {
const handler1 = vi.fn<Parameters<ProducerHandler>, ReturnType<ProducerHandler>>(
async (interest) => new Data(interest.name, Data.ContentType(3)));
Expand All @@ -156,10 +177,13 @@ describe("empty object", () => {
expect(handler1).toHaveBeenCalled();
});

test("verify error", async () => {
test.each([
(verifier) => ({ verifier }),
(verifier) => ({ endpoint: new Endpoint({ verifier }) }),
] satisfies Array<(verifier: Verifier) => fetch.Options>)("verify error %#", async (makeOpts) => {
const verify = vi.fn<Parameters<Verifier["verify"]>, ReturnType<Verifier["verify"]>>()
.mockRejectedValue(new Error("mock-verify-error"));
await expect(fetch("/R", { verifier: { verify }, retxLimit: 0 }))
await expect(fetch("/R", { retxLimit: 0, ...makeOpts({ verify }) }))
.rejects.toThrow(/mock-verify-error/);
expect(verify).toHaveBeenCalledTimes(1);
});
Expand All @@ -183,14 +207,10 @@ test("abort", async () => {
const server = serve("/R", new IterableChunkSource(src));
closers.push(server);

const abort = new AbortController();
const { signal } = abort;
const signal = AbortSignal.timeout(150);
await Promise.all([
(async () => {
await delay(150);
abort.abort();
})(),
expect(fetch("/R", { signal })).rejects.toThrow(),
expect(fetch("/R", { endpoint: new Endpoint({ signal }) })).rejects.toThrow(),
expect(consume(fetch("/R", { signal }))).rejects.toThrow(),
expect(consume(fetch("/R", { signal }).chunks())).rejects.toThrow(),
expect(consume(fetch("/R", { signal }).unordered())).rejects.toThrow(),
Expand Down

0 comments on commit f55b905

Please sign in to comment.