diff --git a/package.json b/package.json index cb547e35..3d3a3393 100644 --- a/package.json +++ b/package.json @@ -15,9 +15,9 @@ }, "packageManager": "pnpm@9.15.4+sha512.b2dc20e2fc72b3e18848459b37359a32064663e5627a51e4c74b2c29dd8e8e0491483c3abb40789cfd578bf362fb6ba8261b05f0387d76792ed6e23ea3b1b6a0", "devDependencies": { - "@types/node": "^22.10.10", + "@types/node": "^22.12.0", "@types/wtfnode": "^0.7.3", - "@typescript/lib-dom": "npm:@types/web@0.0.199", + "@typescript/lib-dom": "npm:@types/web@0.0.200", "@vitest/coverage-v8": "^3.0.4", "@yoursunny/xo-config": "0.60.0", "codedown": "^3.2.1", diff --git a/pkg/psync/package.json b/pkg/psync/package.json index 3b574550..d81bd24a 100644 --- a/pkg/psync/package.json +++ b/pkg/psync/package.json @@ -35,14 +35,14 @@ "@ndn/util": "workspace:*", "@yoursunny/psync-bloom": "github:yoursunny/PSyncBloom-wasm#build", "murmurhash3js-revisited": "^3.0.0", - "pako": "^2.1.0", + "streaming-iterables": "^8.0.1", "tslib": "^2.8.1", + "type-fest": "^4.33.0", "typescript-event-target": "^1.1.1" }, "devDependencies": { "@ndn/fw": "workspace:^", "@ndn/l3face": "workspace:*", - "@types/murmurhash3js-revisited": "^3.0.3", - "@types/pako": "^2.0.3" + "@types/murmurhash3js-revisited": "^3.0.3" } } \ No newline at end of file diff --git a/pkg/psync/src/codec.ts b/pkg/psync/src/codec.ts index 7b3c75c0..78a7f5b6 100644 --- a/pkg/psync/src/codec.ts +++ b/pkg/psync/src/codec.ts @@ -1,5 +1,6 @@ import { Component, type Name, TT } from "@ndn/packet"; import type { BloomFilter } from "@yoursunny/psync-bloom"; +import type { Promisable } from "type-fest"; import type { PSyncCore } from "./core"; import { IBLT } from "./iblt"; @@ -9,30 +10,30 @@ export class PSyncCodec { Object.assign(this, p); } - public iblt2comp(iblt: IBLT): Component { - return new Component(TT.GenericNameComponent, this.ibltCompression.compress(iblt.serialize())); + public async iblt2comp(iblt: IBLT): Promise { + return new Component(TT.GenericNameComponent, await this.ibltCompression.compress(iblt.serialize())); } - public comp2iblt(comp: Component): IBLT { + public async comp2iblt(comp: Component): Promise { const iblt = new IBLT(this.ibltParams); - iblt.deserialize(this.ibltCompression.decompress(comp.value)); + iblt.deserialize(await this.ibltCompression.decompress(comp.value)); return iblt; } - public state2buffer(state: PSyncCore.State): Uint8Array { + public async state2buffer(state: PSyncCore.State): Promise { return this.contentCompression.compress(this.encodeState(state)); } - public buffer2state(buffer: Uint8Array): PSyncCore.State { - return this.decodeState(this.contentCompression.decompress(buffer)); + public async buffer2state(buffer: Uint8Array): Promise { + return this.decodeState(await this.contentCompression.decompress(buffer)); } } export interface PSyncCodec extends Readonly {} export namespace PSyncCodec { export interface Compression { - compress: (input: Uint8Array) => Uint8Array; - decompress: (compressed: Uint8Array) => Uint8Array; + compress: (input: Uint8Array) => Promisable; + decompress: (compressed: Uint8Array) => Promisable; } export interface Parameters { diff --git a/pkg/psync/src/full.ts b/pkg/psync/src/full.ts index 77b78b51..ab8346a9 100644 --- a/pkg/psync/src/full.ts +++ b/pkg/psync/src/full.ts @@ -143,7 +143,7 @@ export class FullSync extends TypedEventTarget implements SyncProtocol return undefined; } - const recvIblt = this.codec.comp2iblt(ibltComp); + const recvIblt = await this.codec.comp2iblt(ibltComp); const { success, positive, negative, total } = this.c.iblt.diff(recvIblt); if (!success && (total >= this.c.threshold || total === 0)) { const state = this.c.list(({ seqNum }) => seqNum > 0); @@ -195,13 +195,15 @@ export class FullSync extends TypedEventTarget implements SyncProtocol } }; - private async sendSyncData(interest: Interest, state: PSyncCore.State, action: string, recvIblt: IBLT): Promise { + private async sendSyncData( + interest: Interest, state: PSyncCore.State, action: string, recvIblt: IBLT, + ): Promise { this.debug(action, recvIblt, state); if (this.cCurrentInterestName?.equals(interest.name)) { this.scheduleSyncInterest(0); } - const server = this.pBuffer.add(interest.name, state, this.pFreshness); + const server = await this.pBuffer.add(interest.name, state, this.pFreshness); return server.processInterest(interest); } @@ -220,7 +222,7 @@ export class FullSync extends TypedEventTarget implements SyncProtocol const abort = new AbortController(); this.cAbort = abort; - const ibltComp = this.codec.iblt2comp(this.c.iblt); + const ibltComp = await this.codec.iblt2comp(this.c.iblt); const name = this.syncPrefix.append(ibltComp, GenericNumber.create(this.c.sumSeqNum)); this.cCurrentInterestName = name; this.debug("c-request"); diff --git a/pkg/psync/src/param-zlib.ts b/pkg/psync/src/param-zlib.ts index 1242633f..69164d38 100644 --- a/pkg/psync/src/param-zlib.ts +++ b/pkg/psync/src/param-zlib.ts @@ -1,13 +1,22 @@ -import { deflate, inflate } from "pako"; +import { concatBuffers } from "@ndn/util"; +import { collect } from "streaming-iterables"; import type { PSyncCodec } from "./codec"; /** Use zlib compression with PSync. */ export const PSyncZlib: PSyncCodec.Compression = { - compress(input) { - return deflate(input, { level: 9 }); + async compress(input) { + return doTransform(input, new CompressionStream("deflate")); }, - decompress(compressed) { - return inflate(compressed); + async decompress(compressed) { + return doTransform(compressed, new DecompressionStream("deflate")); }, }; + +async function doTransform(input: Uint8Array, tr: TransformStream): Promise { + const chunks = await collect( + new Blob([input]).stream() + .pipeThrough(tr) as unknown as AsyncIterable, + ); + return concatBuffers(chunks); +} diff --git a/pkg/psync/src/partial-publisher.ts b/pkg/psync/src/partial-publisher.ts index 6107b8f8..c882afc6 100644 --- a/pkg/psync/src/partial-publisher.ts +++ b/pkg/psync/src/partial-publisher.ts @@ -137,12 +137,12 @@ export class PartialPublisher extends TypedEventTarget implements Sync } const ibltComp = interest.name.at(-1); - const recvIblt = this.codec.comp2iblt(ibltComp); + const recvIblt = await this.codec.comp2iblt(ibltComp); const { success, positive, total } = this.c.iblt.diff(recvIblt); if (!success) { // TODO publish ContentType=Nack via StateProducerBuffer - const ibltComp = this.codec.iblt2comp(this.c.iblt); + const ibltComp = await this.codec.iblt2comp(this.c.iblt); const name = interest.name.append(ibltComp, Segment.create(0)); return new Data(name, Data.ContentType(0x03), Data.FreshnessPeriod(this.sFreshness), Data.FinalBlock); } @@ -205,12 +205,14 @@ export class PartialPublisher extends TypedEventTarget implements Sync } }; - private sendStateData(interest: Interest, state: PSyncCore.State, action: string, freshness: number): Promise { - const ibltComp = this.codec.iblt2comp(this.c.iblt); + private async sendStateData( + interest: Interest, state: PSyncCore.State, action: string, freshness: number, + ): Promise { + const ibltComp = await this.codec.iblt2comp(this.c.iblt); const name = interest.name.append(ibltComp); this.debug(action, interest); - const server = this.pBuffer.add(name, state, freshness); + const server = await this.pBuffer.add(name, state, freshness); return server.processInterest(interest); } } diff --git a/pkg/psync/src/state-fetcher.ts b/pkg/psync/src/state-fetcher.ts index 5c51adb5..1c2580b6 100644 --- a/pkg/psync/src/state-fetcher.ts +++ b/pkg/psync/src/state-fetcher.ts @@ -48,7 +48,7 @@ export class StateFetcher { describe: `${this.describe}[${describeSuffix}f]`, signal, }); - const state = this.codec.buffer2state(payload); + const state = await this.codec.buffer2state(payload); return { versioned, state }; } } diff --git a/pkg/psync/src/state-producer-buffer.ts b/pkg/psync/src/state-producer-buffer.ts index c3d16eaf..8fdb6bae 100644 --- a/pkg/psync/src/state-producer-buffer.ts +++ b/pkg/psync/src/state-producer-buffer.ts @@ -30,8 +30,8 @@ export class StateProducerBuffer { } } - public add(name: Name, state: PSyncCore.State, freshnessPeriod: number): Server { - const source = new BufferChunkSource(this.codec.state2buffer(state)); + public async add(name: Name, state: PSyncCore.State, freshnessPeriod: number): Promise { + const source = new BufferChunkSource(await this.codec.state2buffer(state)); const server = serveVersioned(name, source, { freshnessPeriod, pOpts: this.pOpts,