Skip to content

Commit

Permalink
psync: pako => CompressionStream
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jan 30, 2025
1 parent 255a5fd commit 18539a4
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 31 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
},
"packageManager": "pnpm@9.15.4+sha512.b2dc20e2fc72b3e18848459b37359a32064663e5627a51e4c74b2c29dd8e8e0491483c3abb40789cfd578bf362fb6ba8261b05f0387d76792ed6e23ea3b1b6a0",
"devDependencies": {
"@types/node": "^22.10.10",
"@types/node": "^22.12.0",
"@types/wtfnode": "^0.7.3",
"@typescript/lib-dom": "npm:@types/web@0.0.199",
"@typescript/lib-dom": "npm:@types/web@0.0.200",
"@vitest/coverage-v8": "^3.0.4",
"@yoursunny/xo-config": "0.60.0",
"codedown": "^3.2.1",
Expand Down
6 changes: 3 additions & 3 deletions pkg/psync/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
"@ndn/util": "workspace:*",
"@yoursunny/psync-bloom": "github:yoursunny/PSyncBloom-wasm#build",
"murmurhash3js-revisited": "^3.0.0",
"pako": "^2.1.0",
"streaming-iterables": "^8.0.1",
"tslib": "^2.8.1",
"type-fest": "^4.33.0",
"typescript-event-target": "^1.1.1"
},
"devDependencies": {
"@ndn/fw": "workspace:^",
"@ndn/l3face": "workspace:*",
"@types/murmurhash3js-revisited": "^3.0.3",
"@types/pako": "^2.0.3"
"@types/murmurhash3js-revisited": "^3.0.3"
}
}
19 changes: 10 additions & 9 deletions pkg/psync/src/codec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Component, type Name, TT } from "@ndn/packet";
import type { BloomFilter } from "@yoursunny/psync-bloom";
import type { Promisable } from "type-fest";

import type { PSyncCore } from "./core";
import { IBLT } from "./iblt";
Expand All @@ -9,30 +10,30 @@ export class PSyncCodec {
Object.assign(this, p);
}

public iblt2comp(iblt: IBLT): Component {
return new Component(TT.GenericNameComponent, this.ibltCompression.compress(iblt.serialize()));
public async iblt2comp(iblt: IBLT): Promise<Component> {
return new Component(TT.GenericNameComponent, await this.ibltCompression.compress(iblt.serialize()));
}

public comp2iblt(comp: Component): IBLT {
public async comp2iblt(comp: Component): Promise<IBLT> {
const iblt = new IBLT(this.ibltParams);
iblt.deserialize(this.ibltCompression.decompress(comp.value));
iblt.deserialize(await this.ibltCompression.decompress(comp.value));
return iblt;
}

public state2buffer(state: PSyncCore.State): Uint8Array {
public async state2buffer(state: PSyncCore.State): Promise<Uint8Array> {
return this.contentCompression.compress(this.encodeState(state));
}

public buffer2state(buffer: Uint8Array): PSyncCore.State {
return this.decodeState(this.contentCompression.decompress(buffer));
public async buffer2state(buffer: Uint8Array): Promise<PSyncCore.State> {
return this.decodeState(await this.contentCompression.decompress(buffer));
}
}
export interface PSyncCodec extends Readonly<PSyncCodec.Parameters> {}

export namespace PSyncCodec {
export interface Compression {
compress: (input: Uint8Array) => Uint8Array;
decompress: (compressed: Uint8Array) => Uint8Array;
compress: (input: Uint8Array) => Promisable<Uint8Array>;
decompress: (compressed: Uint8Array) => Promisable<Uint8Array>;
}

export interface Parameters {
Expand Down
10 changes: 6 additions & 4 deletions pkg/psync/src/full.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ export class FullSync extends TypedEventTarget<EventMap> implements SyncProtocol
return undefined;
}

const recvIblt = this.codec.comp2iblt(ibltComp);
const recvIblt = await this.codec.comp2iblt(ibltComp);
const { success, positive, negative, total } = this.c.iblt.diff(recvIblt);
if (!success && (total >= this.c.threshold || total === 0)) {
const state = this.c.list(({ seqNum }) => seqNum > 0);
Expand Down Expand Up @@ -195,13 +195,15 @@ export class FullSync extends TypedEventTarget<EventMap> implements SyncProtocol
}
};

private async sendSyncData(interest: Interest, state: PSyncCore.State, action: string, recvIblt: IBLT): Promise<Data | undefined> {
private async sendSyncData(
interest: Interest, state: PSyncCore.State, action: string, recvIblt: IBLT,
): Promise<Data | undefined> {
this.debug(action, recvIblt, state);
if (this.cCurrentInterestName?.equals(interest.name)) {
this.scheduleSyncInterest(0);
}

const server = this.pBuffer.add(interest.name, state, this.pFreshness);
const server = await this.pBuffer.add(interest.name, state, this.pFreshness);
return server.processInterest(interest);
}

Expand All @@ -220,7 +222,7 @@ export class FullSync extends TypedEventTarget<EventMap> implements SyncProtocol

const abort = new AbortController();
this.cAbort = abort;
const ibltComp = this.codec.iblt2comp(this.c.iblt);
const ibltComp = await this.codec.iblt2comp(this.c.iblt);
const name = this.syncPrefix.append(ibltComp, GenericNumber.create(this.c.sumSeqNum));
this.cCurrentInterestName = name;
this.debug("c-request");
Expand Down
19 changes: 14 additions & 5 deletions pkg/psync/src/param-zlib.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import { deflate, inflate } from "pako";
import { concatBuffers } from "@ndn/util";
import { collect } from "streaming-iterables";

import type { PSyncCodec } from "./codec";

/** Use zlib compression with PSync. */
export const PSyncZlib: PSyncCodec.Compression = {
compress(input) {
return deflate(input, { level: 9 });
async compress(input) {
return doTransform(input, new CompressionStream("deflate"));
},
decompress(compressed) {
return inflate(compressed);
async decompress(compressed) {
return doTransform(compressed, new DecompressionStream("deflate"));
},
};

async function doTransform(input: Uint8Array, tr: TransformStream<Uint8Array, Uint8Array>): Promise<Uint8Array> {
const chunks = await collect(
new Blob([input]).stream()
.pipeThrough(tr) as unknown as AsyncIterable<Uint8Array>,
);
return concatBuffers(chunks);
}
12 changes: 7 additions & 5 deletions pkg/psync/src/partial-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ export class PartialPublisher extends TypedEventTarget<EventMap> implements Sync
}

const ibltComp = interest.name.at(-1);
const recvIblt = this.codec.comp2iblt(ibltComp);
const recvIblt = await this.codec.comp2iblt(ibltComp);

const { success, positive, total } = this.c.iblt.diff(recvIblt);
if (!success) {
// TODO publish ContentType=Nack via StateProducerBuffer
const ibltComp = this.codec.iblt2comp(this.c.iblt);
const ibltComp = await this.codec.iblt2comp(this.c.iblt);
const name = interest.name.append(ibltComp, Segment.create(0));
return new Data(name, Data.ContentType(0x03), Data.FreshnessPeriod(this.sFreshness), Data.FinalBlock);
}
Expand Down Expand Up @@ -205,12 +205,14 @@ export class PartialPublisher extends TypedEventTarget<EventMap> implements Sync
}
};

private sendStateData(interest: Interest, state: PSyncCore.State, action: string, freshness: number): Promise<Data | undefined> {
const ibltComp = this.codec.iblt2comp(this.c.iblt);
private async sendStateData(
interest: Interest, state: PSyncCore.State, action: string, freshness: number,
): Promise<Data | undefined> {
const ibltComp = await this.codec.iblt2comp(this.c.iblt);
const name = interest.name.append(ibltComp);

this.debug(action, interest);
const server = this.pBuffer.add(name, state, freshness);
const server = await this.pBuffer.add(name, state, freshness);
return server.processInterest(interest);
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/psync/src/state-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class StateFetcher {
describe: `${this.describe}[${describeSuffix}f]`,
signal,
});
const state = this.codec.buffer2state(payload);
const state = await this.codec.buffer2state(payload);
return { versioned, state };
}
}
4 changes: 2 additions & 2 deletions pkg/psync/src/state-producer-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ export class StateProducerBuffer {
}
}

public add(name: Name, state: PSyncCore.State, freshnessPeriod: number): Server {
const source = new BufferChunkSource(this.codec.state2buffer(state));
public async add(name: Name, state: PSyncCore.State, freshnessPeriod: number): Promise<Server> {
const source = new BufferChunkSource(await this.codec.state2buffer(state));
const server = serveVersioned(name, source, {
freshnessPeriod,
pOpts: this.pOpts,
Expand Down

0 comments on commit 18539a4

Please sign in to comment.