Skip to content

Commit

Permalink
segmented-object: abort fetcher upon closing FwFace
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jan 28, 2024
1 parent 4aabb80 commit 85f8c5d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 23 deletions.
7 changes: 2 additions & 5 deletions packages/fw/src/forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,9 @@ export namespace Forwarder {
defaultInstance = fw;
}

/** Delete default instance (mainly for unit testing). */
/** Close and delete default instance (mainly for unit testing). */
export function deleteDefault() {
if (!defaultInstance) {
return;
}
defaultInstance.close();
defaultInstance?.close();
defaultInstance = undefined;
}

Expand Down
8 changes: 6 additions & 2 deletions packages/segmented-object/src/fetch/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,13 @@ export class Fetcher extends TypedEventTarget<EventMap> {
private readonly rx = async (iterable: AsyncIterable<FwPacket>) => {
for await (const { l3, token, congestionMark = 0 } of iterable) {
if (l3 instanceof Data && typeof token === "number" && this.acceptContentType.includes(l3.contentType)) {
void this.handleData(l3, token, congestionMark);
await this.handleData(l3, token, congestionMark);
}
}
const ok = this.logic.end();
if (!ok) {
this.fail(new Error("fetch incomplete"));
}
};

private async handleData(data: Data, segNum: number, congestionMark: number) {
Expand Down Expand Up @@ -131,7 +135,7 @@ export class Fetcher extends TypedEventTarget<EventMap> {
}

private readonly handleAbort = () => {
this.fail(new Error("abort"));
this.fail(new Error("fetch aborted"));
};
}

Expand Down
9 changes: 7 additions & 2 deletions packages/segmented-object/src/fetch/logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class FetchLogic extends TypedEventTarget<EventMap> {
}

/** Abort. */
public close() {
public close(): void {
this.running = false;
this.dispatchTypedEvent("unblock", new Event("unblock"));
for (const [, { rtoExpiry }] of this.pending) {
Expand Down Expand Up @@ -250,7 +250,7 @@ export class FetchLogic extends TypedEventTarget<EventMap> {
* Update final segment number (inclusive) when it becomes known.
* Increasing this above opts.segmentRange[1] or a previous value has no effect.
*/
public setFinalSegNum(finalSegNum: number, estimated = false) {
public setFinalSegNum(finalSegNum: number, estimated = false): void {
if (finalSegNum >= this.finalSegNum) {
return;
}
Expand All @@ -262,6 +262,11 @@ export class FetchLogic extends TypedEventTarget<EventMap> {
}
this.dispatchTypedEvent("unblock", new Event("unblock"));
}

/** Notify that the incoming stream has ended. */
public end(): boolean {
return this.pending.size === 0;
}
}

export namespace FetchLogic {
Expand Down
51 changes: 37 additions & 14 deletions packages/segmented-object/tests/serve-fetch.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ afterEach(() => {
Forwarder.deleteDefault();
});

async function* generateChunksSlowly() {
const yieldSize = 8 * 1024;
for (let i = 0; i < objectBody.length; i += yieldSize) {
yield objectBody.subarray(i, i + yieldSize);
await delay(100);
}
}

test("buffer to buffer", async () => {
const chunkSource = makeChunkSource(objectBody);
expect(chunkSource).toBeInstanceOf(BufferChunkSource);
Expand Down Expand Up @@ -195,24 +203,39 @@ test("segment number convention mismatch", async () => {
});

test("abort", async () => {
const src = (async function*() {
const yieldSize = 8 * 1024;
for (let i = 0; i < objectBody.length; i += yieldSize) {
yield objectBody.subarray(i, i + yieldSize);
await delay(100);
}
})();
const server = serve("/R", new IterableChunkSource(src));
const server = serve("/R", new IterableChunkSource(generateChunksSlowly()));
closers.push(server);

const signal = AbortSignal.timeout(200);
const t0 = Date.now();
await Promise.all([
expect(fetch("/R", { signal })).rejects.toThrow(/aborted/),
expect(fetch("/R", { endpoint: new Endpoint({ signal }) })).rejects.toThrow(/aborted/),
expect(consume(fetch("/R", { signal }))).rejects.toThrow(/aborted/),
expect(consume(fetch("/R", { signal }).chunks())).rejects.toThrow(/aborted/),
expect(consume(fetch("/R", { signal }).unordered())).rejects.toThrow(/aborted/),
]);
expect(Date.now() - t0).toBeLessThan(400);
});

test("FwFace closing", async () => {
using bridge = Bridge.create({
fwB: Forwarder.getDefault(),
}).rename("S", "F");
const server = serve("/R", new IterableChunkSource(generateChunksSlowly()), {
endpoint: new Endpoint({ fw: bridge.fwS }),
});
closers.push(server);

const signal = AbortSignal.timeout(150);
setTimeout(() => Forwarder.deleteDefault(), 200);
const t0 = Date.now();
await Promise.all([
expect(fetch("/R", { signal })).rejects.toThrow(),
expect(fetch("/R", { endpoint: new Endpoint({ signal }) })).rejects.toThrow(),
expect(consume(fetch("/R", { signal }))).rejects.toThrow(),
expect(consume(fetch("/R", { signal }).chunks())).rejects.toThrow(),
expect(consume(fetch("/R", { signal }).unordered())).rejects.toThrow(),
expect(fetch("/R")).rejects.toThrow(/incomplete/),
expect(consume(fetch("/R"))).rejects.toThrow(/incomplete/),
expect(consume(fetch("/R").chunks())).rejects.toThrow(/incomplete/),
expect(consume(fetch("/R").unordered())).rejects.toThrow(/incomplete/),
]);
expect(Date.now() - t0).toBeLessThan(400);
});

test("congestion avoidance", async () => {
Expand Down

0 comments on commit 85f8c5d

Please sign in to comment.