Skip to content

Commit

Permalink
segmented-object: rewrite resize logic in IterableChunkSource
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jan 22, 2024
1 parent 3e8faa8 commit e8c352f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 53 deletions.
95 changes: 43 additions & 52 deletions packages/segmented-object/src/serve/chunk-source/iterable.ts
Original file line number Diff line number Diff line change
@@ -1,61 +1,54 @@
import { assert } from "@ndn/util";
import type { AnyIterable } from "streaming-iterables";
import { assert, concatBuffers } from "@ndn/util";
import { type AnyIterable, collect } from "streaming-iterables";

import { type Chunk, type ChunkOptions, type ChunkSource, getMaxChunkSize, getMinChunkSize } from "./common";

/** Gather chunks of acceptable size from scattered buffers. */
class ScatteredChunk {
constructor(private readonly minSize: number, private readonly maxSize: number) {}

private readonly vector: Uint8Array[] = [];
private length = 0;

public append(buf: Uint8Array) {
this.vector.push(buf);
this.length += buf.byteLength;
}

public gather(ignoreMinSize = false): Uint8Array | undefined {
if (!ignoreMinSize && this.length < this.minSize) {
return undefined;
}
if (this.length === 0) { // implies ignoreMinSize
return new Uint8Array();
function resize(min: number, max: number): (buf?: Uint8Array) => Iterable<Uint8Array> {
let vec: Uint8Array[] = [];
let length = 0;
return function*(buf) {
if (!buf) { // final chunk
return yield concatBuffers(vec, length);
}

// fast path when first buffer has acceptable size
let buf = this.vector[0]!;
if (buf.byteLength >= this.minSize && buf.byteLength <= this.maxSize) {
this.length -= buf.byteLength;
return this.vector.shift()!;
const total = length + buf.length;
if (total >= min && total <= max) {
if (length === 0) {
yield buf;
} else {
vec.push(buf);
yield concatBuffers(vec, total);
vec = [];
length = 0;
}
return;
}

// fast path when first buffer has enough payload
if (buf.byteLength > this.maxSize) {
const output = buf.subarray(0, this.maxSize);
this.length -= this.maxSize;
this.vector[0] = buf.subarray(this.maxSize);
return output;
if (total < min) {
vec.push(buf);
length = total;
return;
}
// assert total > max

// slow path that combines multiple buffers
const output = new Uint8Array(Math.min(this.maxSize, this.length));
for (let offset = 0; offset < output.byteLength;) {
buf = this.vector[0]!;
const rem = output.byteLength - offset;
if (buf.byteLength > rem) {
output.set(buf.subarray(0, rem), offset);
offset += rem;
this.vector[0] = buf.subarray(rem);
} else {
output.set(buf, offset);
offset += buf.byteLength;
this.vector.shift();
}
let wanted = max - length;
vec.push(buf.subarray(0, wanted));
yield concatBuffers(vec, max);

let off = wanted;
let rem = buf.length - wanted;
while (rem >= min) {
wanted = Math.min(rem, max);
const end = off + wanted;
yield buf.subarray(off, end);
off = end;
rem -= wanted;
}
this.length -= output.byteLength;
return output;
}

vec = [buf.subarray(off)];
length = rem;
};
}

/**
Expand All @@ -75,18 +68,16 @@ export class IterableChunkSource implements ChunkSource {

public async *listChunks(): AsyncIterable<Chunk> {
let i = -1;
const scattered = new ScatteredChunk(this.minSize, this.maxSize);
const resizer = resize(this.minSize, this.maxSize);
for await (const buf of this.input) {
assert(buf instanceof Uint8Array);
scattered.append(buf);
let payload: Uint8Array | undefined;
while (payload = scattered.gather()) { // eslint-disable-line no-cond-assign
for (const payload of resizer(buf)) {
++i;
yield { i, payload };
}
}
++i;
yield { i, final: i, payload: scattered.gather(true)! };
yield { i, final: i, payload: collect(resizer())[0]! };
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/segmented-object/tests/serve-fetch.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe("file source", () => {

test("iterable to unordered", async () => {
const chunkSource = makeChunkSource((async function*() {
const yieldSizes = [5000, 7000, 3000];
const yieldSizes = [5000, 7000, 20000];
let i = -1;
for (let offset = 0; offset < objectBody.length;) {
const end = offset + yieldSizes[++i % yieldSizes.length]!;
Expand Down

0 comments on commit e8c352f

Please sign in to comment.